From f9f3762ff3c32e539492ba257257d54c0f1167d2 Mon Sep 17 00:00:00 2001 From: nouseforaname <34882943+nouseforaname@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:21:09 +0200 Subject: [PATCH] move recover in progress to storage package --- cmd/serve.go | 10 +- dbservice/dbservice.go | 3 - dbservice/recover_in_progress_operations.go | 24 ----- .../recover_in_progress_operations_test.go | 60 ------------ .../storage/recover_in_progress_operations.go | 56 +++++++++++ .../recover_in_progress_operations_test.go | 98 +++++++++++++++++++ 6 files changed, 162 insertions(+), 89 deletions(-) delete mode 100644 dbservice/recover_in_progress_operations.go delete mode 100644 dbservice/recover_in_progress_operations_test.go create mode 100644 internal/storage/recover_in_progress_operations.go create mode 100644 internal/storage/recover_in_progress_operations_test.go diff --git a/cmd/serve.go b/cmd/serve.go index da14c80a6..dd26fba66 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -108,7 +108,13 @@ func serve() { logger.Fatal("Error initializing service broker config", err) } var serviceBroker domain.ServiceBroker - serviceBroker, err = osbapiBroker.New(cfg, storage.New(db, encryptor), logger) + csbStore := storage.New(db, encryptor) + err = csbStore.RecoverInProgressOperations(logger) + if err != nil { + logger.Fatal("Error recovering in-progress operations", err) + } + + serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger) if err != nil { logger.Fatal("Error initializing service broker", err) } @@ -139,7 +145,7 @@ func serve() { if err != nil { logger.Error("failed to get database connection", err) } - startServer(cfg.Registry, sqldb, brokerAPI, storage.New(db, encryptor), credentials) + startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials) } func serveDocs() { diff --git a/dbservice/dbservice.go b/dbservice/dbservice.go index 10b1a8faa..3aaf70502 100644 --- a/dbservice/dbservice.go +++ b/dbservice/dbservice.go @@ -43,9 +43,6 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB { if err := RunMigrations(db); err != nil { panic(fmt.Sprintf("Error migrating database: %s", err)) } - if err := recoverInProgressOperations(db, logger); err != nil { - panic(fmt.Sprintf("Error recovering in-progress operations: %s", err)) - } }) return db } diff --git a/dbservice/recover_in_progress_operations.go b/dbservice/recover_in_progress_operations.go deleted file mode 100644 index 9b9c1a549..000000000 --- a/dbservice/recover_in_progress_operations.go +++ /dev/null @@ -1,24 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - "gorm.io/gorm" -) - -func recoverInProgressOperations(db *gorm.DB, logger lager.Logger) error { - logger = logger.Session("recover-in-progress-operations") - - var terraformDeploymentBatch []models.TerraformDeployment - result := db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { - for i := range terraformDeploymentBatch { - terraformDeploymentBatch[i].LastOperationState = "failed" - terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress" - logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) - } - - return tx.Save(&terraformDeploymentBatch).Error - }) - - return result.Error -} diff --git a/dbservice/recover_in_progress_operations_test.go b/dbservice/recover_in_progress_operations_test.go deleted file mode 100644 index c757e94ea..000000000 --- a/dbservice/recover_in_progress_operations_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3/lagertest" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "gorm.io/driver/sqlite" - "gorm.io/gorm" -) - -var _ = Describe("RecoverInProgressOperations()", func() { - It("recovers the expected operations", func() { - const ( - recoverID = "fake-id-to-recover" - okID = "fake-id-that-does-not-need-to-be-recovered" - ) - - // Setup - db, err := gorm.Open(sqlite.Open(":memory:"), nil) - Expect(err).NotTo(HaveOccurred()) - Expect(db.Migrator().CreateTable(&models.TerraformDeployment{})).To(Succeed()) - - Expect(db.Create(&models.TerraformDeployment{ - ID: recoverID, - LastOperationType: "fake-type", - LastOperationState: "in progress", - LastOperationMessage: "fake-type in progress", - }).Error).To(Succeed()) - Expect(db.Create(&models.TerraformDeployment{ - ID: okID, - LastOperationType: "fake-type", - LastOperationState: "succeeded", - LastOperationMessage: "fake-type succeeded", - }).Error).To(Succeed()) - - // Call the function - logger := lagertest.NewTestLogger("test") - recoverInProgressOperations(db, logger) - - // Behaviors - By("marking the in-progress operation as failed") - var r1 models.TerraformDeployment - Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) - Expect(r1.LastOperationState).To(Equal("failed")) - Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) - - By("no updating other operations") - var r2 models.TerraformDeployment - Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) - Expect(r2.LastOperationState).To(Equal("succeeded")) - Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) - - By("logging the expected message") - Expect(logger.Buffer().Contents()).To(SatisfyAll( - ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), - ContainSubstring(`"workspace_id":"fake-id-to-recover"`), - )) - }) -}) diff --git a/internal/storage/recover_in_progress_operations.go b/internal/storage/recover_in_progress_operations.go new file mode 100644 index 000000000..ae8f48c9c --- /dev/null +++ b/internal/storage/recover_in_progress_operations.go @@ -0,0 +1,56 @@ +package storage + +import ( + "os" + + "code.cloudfoundry.org/lager/v3" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "gorm.io/gorm" +) + +const FailedMessage = "the broker restarted while the operation was in progress" + +func (s *Storage) RecoverInProgressOperations(logger lager.Logger) error { + logger = logger.Session("recover-in-progress-operations") + + // We only wan't to fail interrupted service instances if we detect that we run as a CF APP. + // VM based csb instances implement a drain mechanism and should need this. Additionally VM + // based csb deployments are scalable horizontally and the below would fail in flight instances + // of another csb process. + if os.Getenv("CF_INSTANCE_GUID") != "" { + var terraformDeploymentBatch []models.TerraformDeployment + result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { + for i := range terraformDeploymentBatch { + terraformDeploymentBatch[i].LastOperationState = "failed" + terraformDeploymentBatch[i].LastOperationMessage = FailedMessage + logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) + } + + return tx.Save(&terraformDeploymentBatch).Error + }) + + return result.Error + } else { + deploymentIds, err := s.GetLockedDeploymentIds() + if err != nil { + return err + } + + for _, id := range deploymentIds { + var receiver models.TerraformDeployment + if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil { + return err + } + receiver.LastOperationState = "failed" + receiver.LastOperationMessage = FailedMessage + + err := s.db.Save(receiver).Error + if err != nil { + return err + } + logger.Info("mark-as-failed", lager.Data{"workspace_id": id}) + } + return err + } + +} diff --git a/internal/storage/recover_in_progress_operations_test.go b/internal/storage/recover_in_progress_operations_test.go new file mode 100644 index 000000000..9ad4fa919 --- /dev/null +++ b/internal/storage/recover_in_progress_operations_test.go @@ -0,0 +1,98 @@ +package storage_test + +import ( + "os" + + "code.cloudfoundry.org/lager/v3/lagertest" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "github.com/cloudfoundry/cloud-service-broker/v2/internal/storage" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +const ( + recoverID = "fake-id-to-recover" + okID = "fake-id-that-does-not-need-to-be-recovered" +) + +var _ = Describe("RecoverInProgressOperations()", func() { + BeforeEach(func() { + // Setup + Expect(db.Create(&models.TerraformDeployment{ + ID: recoverID, + LastOperationType: "fake-type", + LastOperationState: "in progress", + LastOperationMessage: "fake-type in progress", + }).Error).To(Succeed()) + Expect(db.Create(&models.TerraformDeployment{ + ID: okID, + LastOperationType: "fake-type", + LastOperationState: "succeeded", + LastOperationMessage: "fake-type succeeded", + }).Error).To(Succeed()) + var rowCount int64 + db.Model(&models.TerraformDeployment{}).Count(&rowCount) + Expect(rowCount).To(BeNumerically("==", 2)) + + logger = lagertest.NewTestLogger("test") + store = storage.New(db, encryptor) + }) + + When("running as a cf app", func() { + It("recovers the expected operations", func() { + os.Setenv("CF_INSTANCE_GUID", "something") // The presence of this variable means we are running as an App + defer os.Unsetenv("CF_INSTANCE_GUID") + + // Call the function + Expect(store.RecoverInProgressOperations(logger)).To(Succeed()) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) + + When("running on a VM", func() { + It("recovers the expected operations", func() { + // When running on a VM there will be a lockfile and record in the db + Expect(store.WriteLockFile(recoverID)).To(Succeed()) + + // Call the function + Expect(store.RecoverInProgressOperations(logger)).To(Succeed()) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) +})