Skip to content

Commit

Permalink
rebase and update the unbind and bind jobs to accept state updates ad…
Browse files Browse the repository at this point in the history
…d the sync and async job start to unbind and bind. Minor updates
  • Loading branch information
maleck13 committed Jan 23, 2018
1 parent 379144a commit 0175a9a
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 163 deletions.
1 change: 0 additions & 1 deletion pkg/apb/provision_or_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"

"github.com/apex/log"
"github.com/openshift/ansible-service-broker/pkg/clients"
"github.com/openshift/ansible-service-broker/pkg/metrics"
"github.com/openshift/ansible-service-broker/pkg/runtime"
Expand Down
4 changes: 2 additions & 2 deletions pkg/apb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ type Deprovisioner func(si *ServiceInstance, statusUpdate chan<- JobState) (stri
type Updater func(si *ServiceInstance, statusUpdate chan<- JobState) (string, *ExtractedCredentials, error)

// Binder defines a function that knows how to perform a binding
type Binder func(si *ServiceInstance, params *Parameters) (string, *ExtractedCredentials, error)
type Binder func(si *ServiceInstance, params *Parameters, statusUpdate chan<- JobState) (string, *ExtractedCredentials, error)

// UnBinder defines a function that knows how to perform a unbinding
type UnBinder func(si *ServiceInstance, params *Parameters) error
type UnBinder func(si *ServiceInstance, params *Parameters, statusUpdate chan<- JobState) error
2 changes: 1 addition & 1 deletion pkg/apb/unbind.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// Unbind - runs the abp with the unbind action.
func Unbind(instance *ServiceInstance, parameters *Parameters) error {
func Unbind(instance *ServiceInstance, parameters *Parameters, stateUpdates chan<- JobState) error {
log.Notice("============================================================")
log.Notice(" UNBINDING ")
log.Notice("============================================================")
Expand Down
34 changes: 28 additions & 6 deletions pkg/broker/binding_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func NewBindingJob(serviceInstance *apb.ServiceInstance, bindingUUID uuid.UUID,
// Run - run the binding job.
func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) {
metrics.BindingJobStarted()
defer metrics.BindingJobFinished()
jobMsg := JobMsg{
InstanceUUID: p.serviceInstance.ID.String(),
JobToken: token,
Expand All @@ -59,12 +58,36 @@ func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) {
Token: token,
},
}
log.Debug("bindjob: binding job started, calling apb.Bind")
// send starting state
msgBuffer <- jobMsg
stateUpdates := make(chan apb.JobState)
// variables set by the bind action
var (
err error
podName string
extCreds *apb.ExtractedCredentials
)
// run the bind async and block on reading the status updates
go func() {
defer func() {
metrics.BindingJobFinished()
close(stateUpdates)
}()
log.Debug("bindjob: binding job started, calling apb.Bind")

podName, extCreds, err = p.bind(p.serviceInstance, p.params, stateUpdates)

podName, extCreds, err := p.bind(p.serviceInstance, p.params)
log.Debug("bindjob: returned from apb.Bind")
}()

log.Debug("bindjob: returned from apb.Bind")
//read our status updates and send on updated JobMsgs for the subscriber to persist
for su := range stateUpdates {
su.Token = token
su.Method = apb.JobMethodDeprovision
msgBuffer <- JobMsg{InstanceUUID: p.serviceInstance.ID.String(), JobToken: token, State: su, PodName: su.Podname}
}

// status channel closed our job is complete lets check the err
if err != nil {
log.Errorf("bindjob::Binding error occurred.\n%s", err.Error())
jobMsg.State.State = apb.StateFailed
Expand All @@ -73,7 +96,6 @@ func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) {
if err == apb.ErrorPodPullErr {
errMsg = err.Error()
}

// send error message
// can't have an error type in a struct you want marshalled
// https://github.com/golang/go/issues/5161
Expand All @@ -82,7 +104,7 @@ func (p *BindingJob) Run(token string, msgBuffer chan<- JobMsg) {
return
}

// send creds
// send creds if available and state success
log.Debug("bindjob: looks like we're done, sending credentials")
if nil != extCreds {
jobMsg.ExtractedCredentials = *extCreds
Expand Down
64 changes: 28 additions & 36 deletions pkg/broker/binding_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,25 @@ func TestBindingJob_Run(t *testing.T) {
Name string
Binder apb.Binder
BindParams *apb.Parameters
Validate func(msg broker.JobMsg) error
Validate func(msg []broker.JobMsg) error
}{
{
Name: "expect a success msg with extracted credentials",
Binder: func(si *apb.ServiceInstance, params *apb.Parameters) (string, *apb.ExtractedCredentials, error) {
Binder: func(si *apb.ServiceInstance, params *apb.Parameters, status chan<- apb.JobState) (string, *apb.ExtractedCredentials, error) {
return "podName", &apb.ExtractedCredentials{Credentials: map[string]interface{}{
"user": "test",
"pass": "test",
}}, nil
},
Validate: func(msg broker.JobMsg) error {
if msg.State.State != apb.StateSucceeded {
return fmt.Errorf("expected the state to be %v but got %v", apb.StateSucceeded, msg.State.State)
Validate: func(msgs []broker.JobMsg) error {
if err := commonJobMsgValidation(apb.StateSucceeded, apb.JobMethodBind, msgs); err != nil {
return err
}
if msg.State.Method != apb.JobMethodBind {
return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodBind, msg.State.Method)
}
if msg.PodName == "" {
lastMsg := msgs[len(msgs)-1]
if lastMsg.PodName == "" {
return fmt.Errorf("expected the podName to be set but it was empty")
}
credentials := msg.ExtractedCredentials.Credentials
credentials := lastMsg.ExtractedCredentials.Credentials

if _, ok := credentials["user"]; !ok {
return fmt.Errorf("expected a user key in the credentials but it was missing")
Expand All @@ -58,42 +56,38 @@ func TestBindingJob_Run(t *testing.T) {
},
{
Name: "expect failure state and generic error when unknown error type",
Binder: func(si *apb.ServiceInstance, params *apb.Parameters) (string, *apb.ExtractedCredentials, error) {
Binder: func(si *apb.ServiceInstance, params *apb.Parameters, status chan<- apb.JobState) (string, *apb.ExtractedCredentials, error) {
return "", nil, fmt.Errorf("should not see")
},
Validate: func(msg broker.JobMsg) error {
if msg.State.State != apb.StateFailed {
return fmt.Errorf("expected the Job to be in state %v but was in %v ", apb.StateFailed, msg.State.State)
}
if msg.State.Method != apb.JobMethodBind {
return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodBind, msg.State.Method)
Validate: func(msgs []broker.JobMsg) error {
if err := commonJobMsgValidation(apb.StateFailed, apb.JobMethodBind, msgs); err != nil {
return err
}
if msg.State.Error == "" {
lastMsg := msgs[len(msgs)-1]
if lastMsg.State.Error == "" {
return fmt.Errorf("expected an error in the job state but got none")
}
if msg.State.Error == "should not see" {
return fmt.Errorf("expected not to see the error msg %s it should have been replaced with a generic error ", msg.State.Error)
if lastMsg.State.Error == "should not see" {
return fmt.Errorf("expected not to see the error msg %s it should have been replaced with a generic error ", lastMsg.State.Error)
}
return nil
},
},
{
Name: "expect failure state and full error when known error type",
Binder: func(si *apb.ServiceInstance, params *apb.Parameters) (string, *apb.ExtractedCredentials, error) {
Binder: func(si *apb.ServiceInstance, params *apb.Parameters, status chan<- apb.JobState) (string, *apb.ExtractedCredentials, error) {
return "", nil, apb.ErrorPodPullErr
},
Validate: func(msg broker.JobMsg) error {
if msg.State.State != apb.StateFailed {
return fmt.Errorf("expected the Job to be in state %v but was in %v ", apb.StateFailed, msg.State.State)
}
if msg.State.Method != apb.JobMethodBind {
return fmt.Errorf("expected job method to be %v but it was %v", apb.JobMethodBind, msg.State.Method)
Validate: func(msgs []broker.JobMsg) error {
if err := commonJobMsgValidation(apb.StateFailed, apb.JobMethodBind, msgs); err != nil {
return err
}
if msg.State.Error == "" {
lastMsg := msgs[len(msgs)-1]
if lastMsg.State.Error == "" {
return fmt.Errorf("expected an error in the job state but got none")
}
if msg.State.Error != apb.ErrorPodPullErr.Error() {
return fmt.Errorf("expected to see the error msg %s but got %s ", apb.ErrorPodPullErr, msg.State.Error)
if lastMsg.State.Error != apb.ErrorPodPullErr.Error() {
return fmt.Errorf("expected to see the error msg %s but got %s ", apb.ErrorPodPullErr, lastMsg.State.Error)
}
return nil
},
Expand All @@ -104,18 +98,16 @@ func TestBindingJob_Run(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
bindingJob := broker.NewBindingJob(serviceInstance, bindingID, tc.BindParams, tc.Binder)
receiver := make(chan broker.JobMsg)
timedOut := false
time.AfterFunc(1*time.Second, func() {
close(receiver)
timedOut = true
})
go bindingJob.Run("", receiver)

msg := <-receiver
if timedOut {
t.Fatal("timed out waiting for a msg from the Job")
var msgs []broker.JobMsg
for msg := range receiver {
msgs = append(msgs, msg)
}
if err := tc.Validate(msg); err != nil {
if err := tc.Validate(msgs); err != nil {
t.Fatal("failed to validate the jobmsg ", err)
}

Expand Down
81 changes: 20 additions & 61 deletions pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,14 +642,6 @@ func (a AnsibleBroker) Provision(instanceUUID uuid.UUID, req *ProvisionRequest,
log.Error("Failed to start new job for async provision\n%s", err.Error())
return nil, err
}

// HACK: there might be a delay between the first time the state in etcd
// is set and the job was already started. But I need the token.
a.dao.SetState(instanceUUID.String(), apb.JobState{
Token: token,
State: apb.StateInProgress,
Method: apb.JobMethodProvision,
})
} else {
log.Info("reverting to synchronous provisioning in progress")
if err := a.engine.StartNewSyncJob(token, pjob, ProvisionTopic); err != nil {
Expand Down Expand Up @@ -710,14 +702,6 @@ func (a AnsibleBroker) Deprovision(
log.Error("Failed to start new job for async deprovision\n%s", err.Error())
return nil, err
}

// HACK: there might be a delay between the first time the state in etcd
// is set and the job was already started. But I need the token.
a.dao.SetState(instance.ID.String(), apb.JobState{
Token: token,
State: apb.StateInProgress,
Method: apb.JobMethodDeprovision,
})
return &DeprovisionResponse{Operation: token}, nil
}

Expand All @@ -727,11 +711,6 @@ func (a AnsibleBroker) Deprovision(
return nil, err
}
}

err = cleanupDeprovision(instance.ID.String(), a.dao)
if err != nil {
return nil, err
}
return &DeprovisionResponse{}, nil
}

Expand Down Expand Up @@ -900,48 +879,34 @@ func (a AnsibleBroker) Bind(instance apb.ServiceInstance, bindingUUID uuid.UUID,
// 'LaunchApbOnBind' of the broker config, due to lack of async support of
// bind in Open Service Broker API Currently, the 'launchapbonbind' is set
// to false in the 'config' ConfigMap
var bindExtCreds *apb.ExtractedCredentials

metrics.ActionStarted("bind")
var token string
var (
bindExtCreds *apb.ExtractedCredentials
token = a.engine.Token()
bindingJob = NewBindingJob(&instance, bindingUUID, &params, apb.Bind)
)

if async && a.brokerConfig.LaunchApbOnBind {
// asynchronous mode, requires that the launch apb config
// entry is on, and that async comes in from the catalog
log.Info("ASYNC binding in progress")
bindjob := NewBindingJob(&instance, bindingUUID, &params, apb.Bind)

token, err = a.engine.StartNewJob("", bindjob, BindingTopic)
token, err = a.engine.StartNewAsyncJob("", bindingJob, BindingTopic)
if err != nil {
log.Error("Failed to start new job for async binding\n%s", err.Error())
return nil, err
}

if err := a.dao.SetState(instance.ID.String(), apb.JobState{
Token: token,
State: apb.StateInProgress,
Method: apb.JobMethodBind,
}); err != nil {
log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error())
}
} else if a.brokerConfig.LaunchApbOnBind {
// we are synchronous mode
log.Info("Broker configured to run APB bind")
_, bindExtCreds, err = apb.Bind(&instance, &params, nil)

if err != nil {
if err := a.engine.StartNewSyncJob(token, bindingJob, BindingTopic); err != nil {
return nil, err
}
//TODO are we only setting the bindingUUID if sync?
instance.AddBinding(bindingUUID)
if err := a.dao.SetServiceInstance(instance.ID.String(), &instance); err != nil {
return nil, err
}
if bindExtCreds != nil {
err = a.dao.SetExtractedCredentials(bindingUUID.String(), bindExtCreds)
if err != nil {
log.Errorf("Could not persist extracted credentials - %v", err)
return nil, err
}
}
} else {
log.Warning("Broker configured to *NOT* launch and run APB bind")
}
Expand Down Expand Up @@ -1022,35 +987,31 @@ func (a AnsibleBroker) Unbind(
}
metrics.ActionStarted("unbind")

var token string
var jerr error
var (
token = a.engine.Token()
jerr error
unbindJob = NewUnbindingJob(&serviceInstance, &bindInstance, &params, apb.Unbind, skipApbExecution)
)
if async && a.brokerConfig.LaunchApbOnBind {
// asynchronous mode, required that the launch apb config
// entry is on, and that async comes in from the catalog
log.Info("ASYNC unbinding in progress")
unbindjob := NewUnbindingJob(&serviceInstance, &bindInstance, &params, apb.Unbind, skipApbExecution)
token, jerr = a.engine.StartNewJob("", unbindjob, UnbindingTopic)

token, jerr = a.engine.StartNewAsyncJob("", unbindJob, UnbindingTopic)
if jerr != nil {
log.Error("Failed to start new job for async unbind\n%s", jerr.Error())
return nil, jerr
}

if err := a.dao.SetState(serviceInstance.ID.String(), apb.JobState{
Token: token,
State: apb.StateInProgress,
Method: apb.JobMethodUnbind,
}); err != nil {
log.Errorf("failed to set initial jobstate for %v, %v", token, err.Error())
}

} else if a.brokerConfig.LaunchApbOnBind {
// only launch apb if we are always launching the APB.
if skipApbExecution {
log.Debug("Skipping unbind apb execution")
err = nil
} else {
log.Debug("Launching apb for unbind in blocking mode")
err = apb.Unbind(&serviceInstance, &params, nil)
if err := a.engine.StartNewSyncJob(token, unbindJob, UnbindingTopic); err != nil {
return nil, err
}
}
if err != nil {
return nil, err
Expand All @@ -1059,6 +1020,7 @@ func (a AnsibleBroker) Unbind(
log.Warning("Broker configured to *NOT* launch and run APB unbind")
}

//TODO should these not be handled in the subscriber if the subscriber is handling state
if bindExtCreds != nil {
err = a.dao.DeleteExtractedCredentials(bindInstance.ID.String())
if err != nil {
Expand Down Expand Up @@ -1272,9 +1234,6 @@ func (a AnsibleBroker) Update(instanceUUID uuid.UUID, req *UpdateRequest, async
log.Error("Failed to start new job for async update\n%s", err.Error())
return nil, err
}
// HACK: there might be a delay between the first time the state in etcd
// is set and the job was already started. But I need the token.
a.dao.SetState(instanceUUID.String(), apb.JobState{Token: token, State: apb.StateInProgress, Method: apb.JobMethodUpdate})
} else {
log.Info("reverting to synchronous update in progress")
if err := a.engine.StartNewSyncJob(token, ujob, UpdateTopic); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/broker/deprovision_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ func NewDeprovisionJob(serviceInstance *apb.ServiceInstance,
// Run - will run the deprovision job.
func (p *DeprovisionJob) Run(token string, msgBuffer chan<- JobMsg) {
metrics.DeprovisionJobStarted()
defer metrics.DeprovisionJobFinished()
stateUpdates := make(chan apb.JobState)
jobMsg := JobMsg{
InstanceUUID: p.serviceInstance.ID.String(),
JobToken: token,
Expand All @@ -54,14 +52,14 @@ func (p *DeprovisionJob) Run(token string, msgBuffer chan<- JobMsg) {
Token: token,
},
}
stateUpdates := make(chan apb.JobState)

var (
errMsg = "Error occurred during deprovision. Please contact administrator if it persists."
podName string
jobErr error
)

metrics.DeprovisionJobStarted()
go func() {
defer func() {
close(stateUpdates)
Expand Down
Loading

0 comments on commit 0175a9a

Please sign in to comment.