diff --git a/cmd/serve.go b/cmd/serve.go index 5945e7dec..10db1f22e 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -109,6 +109,8 @@ func serve() { } var serviceBroker domain.ServiceBroker csbStore := storage.New(db, encryptor) + csbStore.RecoverInProgressOperations(logger) + serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger) if err != nil { logger.Fatal("Error initializing service broker", err) @@ -140,7 +142,7 @@ func serve() { if err != nil { logger.Error("failed to get database connection", err) } - startServer(cfg.Registry, sqldb, brokerAPI, storage.New(db, encryptor), credentials) + startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials) } func serveDocs() { diff --git a/dbservice/dbservice.go b/dbservice/dbservice.go index 9fdea783d..3aaf70502 100644 --- a/dbservice/dbservice.go +++ b/dbservice/dbservice.go @@ -17,7 +17,6 @@ package dbservice import ( "fmt" - "os" "sync" "code.cloudfoundry.org/lager/v3" @@ -44,15 +43,6 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB { if err := RunMigrations(db); err != nil { panic(fmt.Sprintf("Error migrating database: %s", err)) } - // We only wan't to fail interrupted service instances if we detect that we run as a CF APP. - // VM based csb instances implement a drain mechanism and should need this. Additionally VM - // based csb deployments are scalable horizontally and the below would fail in flight instances - // of another csb process. - if os.Getenv("CF_INSTANCE_GUID") != "" { - if err := recoverInProgressOperations(db, logger); err != nil { - panic(fmt.Sprintf("Error recovering in-progress operations: %s", err)) - } - } }) return db } diff --git a/dbservice/recover_in_progress_operations.go b/dbservice/recover_in_progress_operations.go deleted file mode 100644 index 9b9c1a549..000000000 --- a/dbservice/recover_in_progress_operations.go +++ /dev/null @@ -1,24 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - "gorm.io/gorm" -) - -func recoverInProgressOperations(db *gorm.DB, logger lager.Logger) error { - logger = logger.Session("recover-in-progress-operations") - - var terraformDeploymentBatch []models.TerraformDeployment - result := db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { - for i := range terraformDeploymentBatch { - terraformDeploymentBatch[i].LastOperationState = "failed" - terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress" - logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) - } - - return tx.Save(&terraformDeploymentBatch).Error - }) - - return result.Error -} diff --git a/dbservice/recover_in_progress_operations_test.go b/dbservice/recover_in_progress_operations_test.go deleted file mode 100644 index c757e94ea..000000000 --- a/dbservice/recover_in_progress_operations_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3/lagertest" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "gorm.io/driver/sqlite" - "gorm.io/gorm" -) - -var _ = Describe("RecoverInProgressOperations()", func() { - It("recovers the expected operations", func() { - const ( - recoverID = "fake-id-to-recover" - okID = "fake-id-that-does-not-need-to-be-recovered" - ) - - // Setup - db, err := gorm.Open(sqlite.Open(":memory:"), nil) - Expect(err).NotTo(HaveOccurred()) - Expect(db.Migrator().CreateTable(&models.TerraformDeployment{})).To(Succeed()) - - Expect(db.Create(&models.TerraformDeployment{ - ID: recoverID, - LastOperationType: "fake-type", - LastOperationState: "in progress", - LastOperationMessage: "fake-type in progress", - }).Error).To(Succeed()) - Expect(db.Create(&models.TerraformDeployment{ - ID: okID, - LastOperationType: "fake-type", - LastOperationState: "succeeded", - LastOperationMessage: "fake-type succeeded", - }).Error).To(Succeed()) - - // Call the function - logger := lagertest.NewTestLogger("test") - recoverInProgressOperations(db, logger) - - // Behaviors - By("marking the in-progress operation as failed") - var r1 models.TerraformDeployment - Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) - Expect(r1.LastOperationState).To(Equal("failed")) - Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) - - By("no updating other operations") - var r2 models.TerraformDeployment - Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) - Expect(r2.LastOperationState).To(Equal("succeeded")) - Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) - - By("logging the expected message") - Expect(logger.Buffer().Contents()).To(SatisfyAll( - ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), - ContainSubstring(`"workspace_id":"fake-id-to-recover"`), - )) - }) -}) diff --git a/internal/storage/recover_in_progress_operations.go b/internal/storage/recover_in_progress_operations.go new file mode 100644 index 000000000..47e3f5aa7 --- /dev/null +++ b/internal/storage/recover_in_progress_operations.go @@ -0,0 +1,51 @@ +package storage + +import ( + "os" + + "code.cloudfoundry.org/lager/v3" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "gorm.io/gorm" +) + +func (s *Storage) RecoverInProgressOperations(logger lager.Logger) error { + logger = logger.Session("recover-in-progress-operations") + + // We only wan't to fail interrupted service instances if we detect that we run as a CF APP. + // VM based csb instances implement a drain mechanism and should need this. Additionally VM + // based csb deployments are scalable horizontally and the below would fail in flight instances + // of another csb process. + if os.Getenv("CF_INSTANCE_GUID") != "" { + var terraformDeploymentBatch []models.TerraformDeployment + result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { + for i := range terraformDeploymentBatch { + terraformDeploymentBatch[i].LastOperationState = "failed" + terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress" + logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) + } + + return tx.Save(&terraformDeploymentBatch).Error + }) + + return result.Error + } else { + deploymentIds, err := s.LockedDeploymentIds() + if err != nil { + return err + } + + for _, id := range deploymentIds { + var receiver models.TerraformDeployment + if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil { + return err + } + receiver.LastOperationState = "failed" + err := s.db.Save(receiver).Error + if err != nil { + return err + } + } + return err + } + +} diff --git a/internal/storage/recover_in_progress_operations_test.go b/internal/storage/recover_in_progress_operations_test.go new file mode 100644 index 000000000..c3036096f --- /dev/null +++ b/internal/storage/recover_in_progress_operations_test.go @@ -0,0 +1,120 @@ +package storage_test + +import ( + "errors" + "os" + "strings" + + "code.cloudfoundry.org/lager/v3/lagertest" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "github.com/cloudfoundry/cloud-service-broker/v2/internal/storage" + "github.com/cloudfoundry/cloud-service-broker/v2/internal/storage/storagefakes" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +const ( + recoverID = "fake-id-to-recover" + okID = "fake-id-that-does-not-need-to-be-recovered" +) + +var _ = Describe("RecoverInProgressOperations()", func() { + BeforeEach(func() { + + // Setup + db, err := gorm.Open(sqlite.Open(":memory:"), nil) + Expect(err).NotTo(HaveOccurred()) + Expect(db.Migrator().CreateTable(&models.TerraformDeployment{})).To(Succeed()) + + Expect(db.Create(&models.TerraformDeployment{ + ID: recoverID, + LastOperationType: "fake-type", + LastOperationState: "in progress", + LastOperationMessage: "fake-type in progress", + }).Error).To(Succeed()) + Expect(db.Create(&models.TerraformDeployment{ + ID: okID, + LastOperationType: "fake-type", + LastOperationState: "succeeded", + LastOperationMessage: "fake-type succeeded", + }).Error).To(Succeed()) + + encryptor := &storagefakes.FakeEncryptor{ + DecryptStub: func(bytes []byte) ([]byte, error) { + if string(bytes) == `cannot-be-decrypted` { + return nil, errors.New("fake decryption error") + } + return bytes, nil + }, + EncryptStub: func(bytes []byte) ([]byte, error) { + if strings.Contains(string(bytes), `cannot-be-encrypted`) { + return nil, errors.New("fake encryption error") + } + return []byte(`{"encrypted":` + string(bytes) + `}`), nil + }, + } + + logger = lagertest.NewTestLogger("test") + store = storage.New(db, encryptor) + }) + + When("running as a cf app", func() { + It("recovers the expected operations", func() { + os.Setenv("CF_INSTANCE_GUID", "something") // The presence of this variable means we are running as an App + defer os.Unsetenv("CF_INSTANCE_GUID") + + // Call the function + store.RecoverInProgressOperations(logger) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) + + When("running on a VM", func() { + It("recovers the expected operations", func() { + // When running on a VM there will be a lockfile and record in the db + store.WriteLockFile(recoverID) + + // Call the function + store.RecoverInProgressOperations(logger) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) +}) diff --git a/internal/storage/storage_suite_test.go b/internal/storage/storage_suite_test.go index c02209966..31728a8c0 100644 --- a/internal/storage/storage_suite_test.go +++ b/internal/storage/storage_suite_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "code.cloudfoundry.org/lager/v3/lagertest" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "gorm.io/driver/sqlite" @@ -19,6 +20,7 @@ var ( db *gorm.DB encryptor *storagefakes.FakeEncryptor store *storage.Storage + logger *lagertest.TestLogger ) func TestStorage(t *testing.T) { diff --git a/internal/storage/terraform_deployment.go b/internal/storage/terraform_deployment.go index 168bb15eb..f9b2b1f95 100644 --- a/internal/storage/terraform_deployment.go +++ b/internal/storage/terraform_deployment.go @@ -172,6 +172,24 @@ func (s *Storage) RemoveLockFile(deploymentID string) error { return os.Remove(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID))) } +func (s *Storage) GetLockFileNames() ([]string, error) { + entries, err := os.ReadDir(s.lockFileDir) + var names []string + for _, entry := range entries { + names = append(names, entry.Name()) + } + return names, err +} + +func (s *Storage) LockedDeploymentIds() ([]string, error) { + entries, err := os.ReadDir(s.lockFileDir) + var names []string + for _, entry := range entries { + names = append(names, strings.ReplaceAll(entry.Name(), "_", ":")) + } + return names, err +} + func sanitizeFileName(name string) string { return strings.ReplaceAll(name, ":", "_") } diff --git a/internal/storage/terraform_deployment_test.go b/internal/storage/terraform_deployment_test.go index 420923123..a6d390e67 100644 --- a/internal/storage/terraform_deployment_test.go +++ b/internal/storage/terraform_deployment_test.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "errors" "fmt" + "os" "strings" "github.com/hashicorp/go-version" @@ -34,6 +35,7 @@ var _ = Describe("TerraformDeployments", func() { }, } + os.RemoveAll("/tmp/csb/") store = storage.New(db, encryptor) }) @@ -222,6 +224,34 @@ var _ = Describe("TerraformDeployments", func() { Expect(store.LockFilesExist()).To(BeFalse()) }) }) + + Describe("GetLockFileNames", func() { + FIt("returns correct names", func() { + names, err := store.GetLockFileNames() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(BeEmpty()) + + Expect(store.WriteLockFile("1234")).To(Succeed()) + Expect(store.WriteLockFile("5678")).To(Succeed()) + + names, err = store.GetLockFileNames() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(ContainElements("1234", "5678")) + + Expect(store.RemoveLockFile("1234")).To(Succeed()) + + names, err = store.GetLockFileNames() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(ContainElements("5678")) + Expect(names).ToNot(ContainElements("1234")) + + Expect(store.RemoveLockFile("5678")).To(Succeed()) + + names, err = store.GetLockFileNames() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(BeEmpty()) + }) + }) }) func addFakeTerraformDeployments() {