Skip to content

Commit

Permalink
feat: WIP cleanup existing lockfiles on start and mark as failed
Browse files Browse the repository at this point in the history
  • Loading branch information
ifindlay-cci authored and FelisiaM committed Sep 10, 2024
1 parent cb52fd4 commit e6c3c23
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 95 deletions.
4 changes: 3 additions & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func serve() {
}
var serviceBroker domain.ServiceBroker
csbStore := storage.New(db, encryptor)
csbStore.RecoverInProgressOperations(logger)

serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger)
if err != nil {
logger.Fatal("Error initializing service broker", err)
Expand Down Expand Up @@ -140,7 +142,7 @@ func serve() {
if err != nil {
logger.Error("failed to get database connection", err)
}
startServer(cfg.Registry, sqldb, brokerAPI, storage.New(db, encryptor), credentials)
startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials)
}

func serveDocs() {
Expand Down
10 changes: 0 additions & 10 deletions dbservice/dbservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package dbservice

import (
"fmt"
"os"
"sync"

"code.cloudfoundry.org/lager/v3"
Expand All @@ -44,15 +43,6 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB {
if err := RunMigrations(db); err != nil {
panic(fmt.Sprintf("Error migrating database: %s", err))
}
// We only wan't to fail interrupted service instances if we detect that we run as a CF APP.
// VM based csb instances implement a drain mechanism and should need this. Additionally VM
// based csb deployments are scalable horizontally and the below would fail in flight instances
// of another csb process.
if os.Getenv("CF_INSTANCE_GUID") != "" {
if err := recoverInProgressOperations(db, logger); err != nil {
panic(fmt.Sprintf("Error recovering in-progress operations: %s", err))
}
}
})
return db
}
24 changes: 0 additions & 24 deletions dbservice/recover_in_progress_operations.go

This file was deleted.

60 changes: 0 additions & 60 deletions dbservice/recover_in_progress_operations_test.go

This file was deleted.

51 changes: 51 additions & 0 deletions internal/storage/recover_in_progress_operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package storage

import (
"os"

"code.cloudfoundry.org/lager/v3"
"github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models"
"gorm.io/gorm"
)

func (s *Storage) RecoverInProgressOperations(logger lager.Logger) error {
logger = logger.Session("recover-in-progress-operations")

// We only wan't to fail interrupted service instances if we detect that we run as a CF APP.
// VM based csb instances implement a drain mechanism and should need this. Additionally VM
// based csb deployments are scalable horizontally and the below would fail in flight instances
// of another csb process.
if os.Getenv("CF_INSTANCE_GUID") != "" {
var terraformDeploymentBatch []models.TerraformDeployment
result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error {
for i := range terraformDeploymentBatch {
terraformDeploymentBatch[i].LastOperationState = "failed"
terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress"
logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID})
}

return tx.Save(&terraformDeploymentBatch).Error
})

return result.Error
} else {
deploymentIds, err := s.LockedDeploymentIds()
if err != nil {
return err
}

for _, id := range deploymentIds {
var receiver models.TerraformDeployment
if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil {
return err
}
receiver.LastOperationState = "failed"
err := s.db.Save(receiver).Error
if err != nil {
return err
}
}
return err
}

}
120 changes: 120 additions & 0 deletions internal/storage/recover_in_progress_operations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package storage_test

import (
"errors"
"os"
"strings"

"code.cloudfoundry.org/lager/v3/lagertest"
"github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models"
"github.com/cloudfoundry/cloud-service-broker/v2/internal/storage"
"github.com/cloudfoundry/cloud-service-broker/v2/internal/storage/storagefakes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

const (
recoverID = "fake-id-to-recover"
okID = "fake-id-that-does-not-need-to-be-recovered"
)

var _ = Describe("RecoverInProgressOperations()", func() {
BeforeEach(func() {

// Setup
db, err := gorm.Open(sqlite.Open(":memory:"), nil)
Expect(err).NotTo(HaveOccurred())
Expect(db.Migrator().CreateTable(&models.TerraformDeployment{})).To(Succeed())

Expect(db.Create(&models.TerraformDeployment{
ID: recoverID,
LastOperationType: "fake-type",
LastOperationState: "in progress",
LastOperationMessage: "fake-type in progress",
}).Error).To(Succeed())
Expect(db.Create(&models.TerraformDeployment{
ID: okID,
LastOperationType: "fake-type",
LastOperationState: "succeeded",
LastOperationMessage: "fake-type succeeded",
}).Error).To(Succeed())

encryptor := &storagefakes.FakeEncryptor{
DecryptStub: func(bytes []byte) ([]byte, error) {
if string(bytes) == `cannot-be-decrypted` {
return nil, errors.New("fake decryption error")
}
return bytes, nil
},
EncryptStub: func(bytes []byte) ([]byte, error) {
if strings.Contains(string(bytes), `cannot-be-encrypted`) {
return nil, errors.New("fake encryption error")
}
return []byte(`{"encrypted":` + string(bytes) + `}`), nil
},
}

logger = lagertest.NewTestLogger("test")
store = storage.New(db, encryptor)
})

When("running as a cf app", func() {
It("recovers the expected operations", func() {
os.Setenv("CF_INSTANCE_GUID", "something") // The presence of this variable means we are running as an App
defer os.Unsetenv("CF_INSTANCE_GUID")

// Call the function
store.RecoverInProgressOperations(logger)

// Behaviors
By("marking the in-progress operation as failed")
var r1 models.TerraformDeployment
Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed())
Expect(r1.LastOperationState).To(Equal("failed"))
Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress"))

By("no updating other operations")
var r2 models.TerraformDeployment
Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed())
Expect(r2.LastOperationState).To(Equal("succeeded"))
Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded"))

By("logging the expected message")
Expect(logger.Buffer().Contents()).To(SatisfyAll(
ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`),
ContainSubstring(`"workspace_id":"fake-id-to-recover"`),
))
})
})

When("running on a VM", func() {
It("recovers the expected operations", func() {
// When running on a VM there will be a lockfile and record in the db
store.WriteLockFile(recoverID)

// Call the function
store.RecoverInProgressOperations(logger)

// Behaviors
By("marking the in-progress operation as failed")
var r1 models.TerraformDeployment
Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed())
Expect(r1.LastOperationState).To(Equal("failed"))
Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress"))

By("no updating other operations")
var r2 models.TerraformDeployment
Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed())
Expect(r2.LastOperationState).To(Equal("succeeded"))
Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded"))

By("logging the expected message")
Expect(logger.Buffer().Contents()).To(SatisfyAll(
ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`),
ContainSubstring(`"workspace_id":"fake-id-to-recover"`),
))
})
})
})
2 changes: 2 additions & 0 deletions internal/storage/storage_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"testing"

"code.cloudfoundry.org/lager/v3/lagertest"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gorm.io/driver/sqlite"
Expand All @@ -19,6 +20,7 @@ var (
db *gorm.DB
encryptor *storagefakes.FakeEncryptor
store *storage.Storage
logger *lagertest.TestLogger
)

func TestStorage(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions internal/storage/terraform_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,24 @@ func (s *Storage) RemoveLockFile(deploymentID string) error {
return os.Remove(fmt.Sprintf("%s/%s", s.lockFileDir, sanitizeFileName(deploymentID)))
}

func (s *Storage) GetLockFileNames() ([]string, error) {
entries, err := os.ReadDir(s.lockFileDir)
var names []string
for _, entry := range entries {
names = append(names, entry.Name())
}
return names, err
}

func (s *Storage) LockedDeploymentIds() ([]string, error) {
entries, err := os.ReadDir(s.lockFileDir)
var names []string
for _, entry := range entries {
names = append(names, strings.ReplaceAll(entry.Name(), "_", ":"))
}
return names, err
}

func sanitizeFileName(name string) string {
return strings.ReplaceAll(name, ":", "_")
}
30 changes: 30 additions & 0 deletions internal/storage/terraform_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"os"
"strings"

"github.com/hashicorp/go-version"
Expand Down Expand Up @@ -34,6 +35,7 @@ var _ = Describe("TerraformDeployments", func() {
},
}

os.RemoveAll("/tmp/csb/")
store = storage.New(db, encryptor)
})

Expand Down Expand Up @@ -222,6 +224,34 @@ var _ = Describe("TerraformDeployments", func() {
Expect(store.LockFilesExist()).To(BeFalse())
})
})

Describe("GetLockFileNames", func() {
FIt("returns correct names", func() {
names, err := store.GetLockFileNames()
Expect(err).NotTo(HaveOccurred())
Expect(names).To(BeEmpty())

Expect(store.WriteLockFile("1234")).To(Succeed())
Expect(store.WriteLockFile("5678")).To(Succeed())

names, err = store.GetLockFileNames()
Expect(err).NotTo(HaveOccurred())
Expect(names).To(ContainElements("1234", "5678"))

Expect(store.RemoveLockFile("1234")).To(Succeed())

names, err = store.GetLockFileNames()
Expect(err).NotTo(HaveOccurred())
Expect(names).To(ContainElements("5678"))
Expect(names).ToNot(ContainElements("1234"))

Expect(store.RemoveLockFile("5678")).To(Succeed())

names, err = store.GetLockFileNames()
Expect(err).NotTo(HaveOccurred())
Expect(names).To(BeEmpty())
})
})
})

func addFakeTerraformDeployments() {
Expand Down

0 comments on commit e6c3c23

Please sign in to comment.