Skip to content

Commit

Permalink
move recover in progress to storage package
Browse files Browse the repository at this point in the history
  • Loading branch information
nouseforaname committed Sep 12, 2024
1 parent a88ba04 commit f9f3762
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 89 deletions.
10 changes: 8 additions & 2 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ func serve() {
logger.Fatal("Error initializing service broker config", err)
}
var serviceBroker domain.ServiceBroker
serviceBroker, err = osbapiBroker.New(cfg, storage.New(db, encryptor), logger)
csbStore := storage.New(db, encryptor)
err = csbStore.RecoverInProgressOperations(logger)
if err != nil {
logger.Fatal("Error recovering in-progress operations", err)
}

serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger)
if err != nil {
logger.Fatal("Error initializing service broker", err)
}
Expand Down Expand Up @@ -139,7 +145,7 @@ func serve() {
if err != nil {
logger.Error("failed to get database connection", err)
}
startServer(cfg.Registry, sqldb, brokerAPI, storage.New(db, encryptor), credentials)
startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials)
}

func serveDocs() {
Expand Down
3 changes: 0 additions & 3 deletions dbservice/dbservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB {
if err := RunMigrations(db); err != nil {
panic(fmt.Sprintf("Error migrating database: %s", err))
}
if err := recoverInProgressOperations(db, logger); err != nil {
panic(fmt.Sprintf("Error recovering in-progress operations: %s", err))
}
})
return db
}
24 changes: 0 additions & 24 deletions dbservice/recover_in_progress_operations.go

This file was deleted.

60 changes: 0 additions & 60 deletions dbservice/recover_in_progress_operations_test.go

This file was deleted.

56 changes: 56 additions & 0 deletions internal/storage/recover_in_progress_operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package storage

import (
"os"

"code.cloudfoundry.org/lager/v3"
"github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models"
"gorm.io/gorm"
)

const FailedMessage = "the broker restarted while the operation was in progress"

func (s *Storage) RecoverInProgressOperations(logger lager.Logger) error {
logger = logger.Session("recover-in-progress-operations")

// We only wan't to fail interrupted service instances if we detect that we run as a CF APP.
// VM based csb instances implement a drain mechanism and should need this. Additionally VM
// based csb deployments are scalable horizontally and the below would fail in flight instances
// of another csb process.
if os.Getenv("CF_INSTANCE_GUID") != "" {
var terraformDeploymentBatch []models.TerraformDeployment
result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error {
for i := range terraformDeploymentBatch {
terraformDeploymentBatch[i].LastOperationState = "failed"
terraformDeploymentBatch[i].LastOperationMessage = FailedMessage
logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID})
}

return tx.Save(&terraformDeploymentBatch).Error
})

return result.Error
} else {
deploymentIds, err := s.GetLockedDeploymentIds()
if err != nil {
return err
}

for _, id := range deploymentIds {
var receiver models.TerraformDeployment
if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil {
return err
}
receiver.LastOperationState = "failed"
receiver.LastOperationMessage = FailedMessage

err := s.db.Save(receiver).Error
if err != nil {
return err
}
logger.Info("mark-as-failed", lager.Data{"workspace_id": id})
}
return err
}

}
98 changes: 98 additions & 0 deletions internal/storage/recover_in_progress_operations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package storage_test

import (
"os"

"code.cloudfoundry.org/lager/v3/lagertest"
"github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models"
"github.com/cloudfoundry/cloud-service-broker/v2/internal/storage"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

const (
recoverID = "fake-id-to-recover"
okID = "fake-id-that-does-not-need-to-be-recovered"
)

var _ = Describe("RecoverInProgressOperations()", func() {
BeforeEach(func() {
// Setup
Expect(db.Create(&models.TerraformDeployment{
ID: recoverID,
LastOperationType: "fake-type",
LastOperationState: "in progress",
LastOperationMessage: "fake-type in progress",
}).Error).To(Succeed())
Expect(db.Create(&models.TerraformDeployment{
ID: okID,
LastOperationType: "fake-type",
LastOperationState: "succeeded",
LastOperationMessage: "fake-type succeeded",
}).Error).To(Succeed())
var rowCount int64
db.Model(&models.TerraformDeployment{}).Count(&rowCount)
Expect(rowCount).To(BeNumerically("==", 2))

logger = lagertest.NewTestLogger("test")
store = storage.New(db, encryptor)
})

When("running as a cf app", func() {
It("recovers the expected operations", func() {
os.Setenv("CF_INSTANCE_GUID", "something") // The presence of this variable means we are running as an App
defer os.Unsetenv("CF_INSTANCE_GUID")

// Call the function
Expect(store.RecoverInProgressOperations(logger)).To(Succeed())

// Behaviors
By("marking the in-progress operation as failed")
var r1 models.TerraformDeployment
Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed())
Expect(r1.LastOperationState).To(Equal("failed"))
Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress"))

By("no updating other operations")
var r2 models.TerraformDeployment
Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed())
Expect(r2.LastOperationState).To(Equal("succeeded"))
Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded"))

By("logging the expected message")
Expect(logger.Buffer().Contents()).To(SatisfyAll(
ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`),
ContainSubstring(`"workspace_id":"fake-id-to-recover"`),
))
})
})

When("running on a VM", func() {
It("recovers the expected operations", func() {
// When running on a VM there will be a lockfile and record in the db
Expect(store.WriteLockFile(recoverID)).To(Succeed())

// Call the function
Expect(store.RecoverInProgressOperations(logger)).To(Succeed())

// Behaviors
By("marking the in-progress operation as failed")
var r1 models.TerraformDeployment
Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed())
Expect(r1.LastOperationState).To(Equal("failed"))
Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress"))

By("no updating other operations")
var r2 models.TerraformDeployment
Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed())
Expect(r2.LastOperationState).To(Equal("succeeded"))
Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded"))

By("logging the expected message")
Expect(logger.Buffer().Contents()).To(SatisfyAll(
ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`),
ContainSubstring(`"workspace_id":"fake-id-to-recover"`),
))
})
})
})

0 comments on commit f9f3762

Please sign in to comment.