Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow lock file path to be set via config file #1118

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
Expand Down Expand Up @@ -241,43 +242,46 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H
}
go func() {
var err error
if tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "" {
switch {
case tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "":
err = httpServer.ListenAndServeTLS(tlsCertCaBundleFilePath, tlsKeyFilePath)
} else {
default:
err = httpServer.ListenAndServe()
}
if err == http.ErrServerClosed {
logger.Info("shutting down csb")
} else {

if !errors.Is(err, http.ErrServerClosed) {
logger.Fatal("Failed to start broker", err)
}
logger.Info("shutting down csb")
}()

return httpServer
}

func listenForShutdownSignal(httpServer *http.Server, logger lager.Logger, store *storage.Storage) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)

signalReceived := <-sigChan
sig := <-sigChan
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()

switch signalReceived {
if err := httpServer.Shutdown(ctx); err != nil {
logger.Fatal("shutdown error: %v", err)
}
logger.Info("server is shutting down gracefully allowing for in flight work to finish", lager.Data{"signal": sig})

case syscall.SIGTERM:
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownTimeout)
if err := httpServer.Shutdown(shutdownCtx); err != nil {
logger.Fatal("shutdown error: %v", err)
}
logger.Info("received SIGTERM, server is shutting down gracefully allowing for in flight work to finish")
defer shutdownRelease()
for store.LockFilesExist() {
logger.Info("draining csb in progress")
time.Sleep(time.Second * 1)
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for range ticker.C {
if !store.LockFilesExist() {
break
}
logger.Info("draining complete")
default:
logger.Info(fmt.Sprintf("csb does not handle the %s interrupt signal", signalReceived))
logger.Info("draining csb in progress")
}

logger.Info("draining complete")
}

func importStateHandler(store *storage.Storage) http.Handler {
Expand Down
17 changes: 7 additions & 10 deletions integrationtest/termination_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,15 @@ var _ = Describe("Recovery From Broker Termination", func() {
Expect(response.Error).NotTo(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusAccepted))
Eventually(stdout, time.Second*5).Should(Say(`tofu","apply","-auto-approve"`))

By("gracefully stopping the broker")
// Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped.
go func() {
defer GinkgoRecover()
Expect(broker.Stop()).To(Succeed())
}()
Expect(broker.RequestStop()).To(Succeed())

By("logging a message")
Eventually(stdout).Should(Say("received SIGTERM"))
Eventually(stdout).Should(Say("draining csb in progress"))
By("checking that the broker logged a message")
Eventually(stdout).Should(Say("server is shutting down gracefully allowing for in flight work to finish"))

By("ensuring that the broker rejects requests")
Expect(broker.Client.LastOperation(instanceGUID, uuid.NewString()).Error).To(HaveOccurred())
By("ensuring that the broker rejects subsequent requests")
Expect(broker.Client.LastOperation(instanceGUID, uuid.NewString()).Error).To(MatchError(ContainSubstring("connect: connection refused")))

// Fun stuff, do not optimize this with a SatisfyAll().. The relevant part of the docs is:
// When Say succeeds, it fast forwards the gbytes.Buffer's read cursor to just after the successful match.
Expand All @@ -89,6 +85,7 @@ var _ = Describe("Recovery From Broker Termination", func() {
Expect(broker.Deprovision(si)).To(Succeed())
})
})

Describe("when a vm broker did not properly drain", func() {
var dirDefault string
BeforeEach(func() {
Expand Down
9 changes: 8 additions & 1 deletion internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ package storage
import (
"os"

"github.com/spf13/viper"
"gorm.io/gorm"
)

const lockfileDir = "lockfiledir"

func init() {
viper.BindEnv(lockfileDir, "CSB_LOCKFILE_DIR")
}

type Storage struct {
db *gorm.DB
encryptor Encryptor
Expand All @@ -18,7 +25,7 @@ func New(db *gorm.DB, encryptor Encryptor) *Storage {
// but not every environment will opt for using VM based deployments. So detect if the lockfile
// director is present.

dirDefault := os.Getenv("CSB_LOCKFILE_DIR")
dirDefault := viper.GetString(lockfileDir)
if _, err := os.Stat(dirDefault); err != nil {
dirDefault, _ = os.MkdirTemp("/tmp/", "lockfiles")
}
Expand Down
14 changes: 13 additions & 1 deletion internal/testdrive/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@ type Broker struct {
Stderr *bytes.Buffer
}

// Stop requests that the broker stops and waits for it to exit
func (b *Broker) Stop() error {
switch {
case b == nil, b.runner == nil:
return nil
default:
return b.runner.gracefullStop()
return b.runner.gracefulStop()
}
}

// RequestStop requests that the broker stop, but does not wait for exit
func (b *Broker) RequestStop() error {
switch {
case b == nil, b.runner == nil:
return nil
default:
return b.runner.requestStop()
}
}

// Terminate forces the broker to stop
func (b *Broker) Terminate() error {
switch {
case b == nil, b.runner == nil:
Expand Down
39 changes: 30 additions & 9 deletions internal/testdrive/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func (r *runner) error(err error) *runner {
return r
}

func (r *runner) gracefullStop() error {
// gracefulStop sends a SIGTERM and waits for the process to stop
// See also: requestStop(), forceStop()
func (r *runner) gracefulStop() error {
if r.exited {
return nil
}
if r.cmd != nil && r.cmd.Process != nil {
if err := r.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return err
}
if err := r.signal(syscall.SIGTERM); err != nil {
return err
}

for !r.exited {
Expand All @@ -44,14 +44,25 @@ func (r *runner) gracefullStop() error {
return nil
}

// requestStop sends a SIGTERM and does not wait
// See also: gracefulStop(), forceStop()
func (r *runner) requestStop() error {
if r.exited {
return nil
}
if err := r.signal(syscall.SIGTERM); err != nil {
return err
}

return nil
}

func (r *runner) forceStop() error {
if r.exited {
return nil
}
if r.cmd != nil && r.cmd.Process != nil {
if err := r.cmd.Process.Signal(syscall.SIGKILL); err != nil {
return err
}
if err := r.signal(syscall.SIGKILL); err != nil {
return err
}

for !r.exited {
Expand All @@ -61,6 +72,16 @@ func (r *runner) forceStop() error {
return nil
}

func (r *runner) signal(sig syscall.Signal) error {
if r.cmd != nil && r.cmd.Process != nil {
if err := r.cmd.Process.Signal(sig); err != nil {
return err
}
}

return nil
}

// monitor waits for the command to exit and cleans up. It is typically run as a goroutine
func (r *runner) monitor() {
r.err = r.cmd.Wait()
Expand Down
Loading