From 2c37858f1dfa7b79cf63e3d4b5104602c8dc5fb2 Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Thu, 9 Jun 2022 14:10:11 -0700 Subject: [PATCH 1/7] When database plugin Clean is called, close connections in goroutines We are seeing long pre-seal times when nodes are being shut down, which seems to be delays in releasing the global stateLock, which further seems to be delayed by cleaning up the builtin database plugin. The database plugin `Clean()` eventually calls, synchronously, `Close()` on every database instance that has been loaded. The database instance `Close()` calls can take a long time to complete (e.g., [mongo](https://github.com/hashicorp/vault/blob/v1.10.3/plugins/database/mongodb/connection_producer.go#L132) can take up to a minute to close its client before it will return). So, instead, we change the `Clean()` method to close all of the database instances in goroutines so they will be closed more quickly and won't block the release of the global stateLock. This should be safe because `Clean()` will only be called when the plugin is being unloaded, and so the connections will no longer be used. In addition, this adds metrics tracking how many of each type of databases are being used by the builtin database plugin. --- builtin/logical/database/backend.go | 39 +++++++++++++++++++++++--- builtin/logical/database/mocks_test.go | 3 +- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/builtin/logical/database/backend.go b/builtin/logical/database/backend.go index cd24943c2db8..97e014c946db 100644 --- a/builtin/logical/database/backend.go +++ b/builtin/logical/database/backend.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-secure-stdlib/strutil" "github.com/hashicorp/go-uuid" @@ -280,10 +281,30 @@ func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name stri id: id, name: name, } + b.addConnectionsCounter(dbw, 1) b.connections[name] = dbi return dbi, nil } +// addConnectionsCounter is used to keep metrics on how many and what types of databases we have open +func (b *databaseBackend) addConnectionsCounter(dbw databaseVersionWrapper, amount int) { + // keep track of what databases we open + labels := make([]metrics.Label, 0, 2) + dbType, err := dbw.Type() + if err == nil { + labels = append(labels, metrics.Label{Name: "type", Value: dbType}) + } else { + b.Logger().Debug("Error getting database type", "err", err) + labels = append(labels, metrics.Label{Name: "type", Value: "unknown"}) + } + if dbw.isV4() { + labels = append(labels, metrics.Label{Name: "version", Value: "4"}) + } else { + labels = append(labels, metrics.Label{Name: "version", Value: "5"}) + } + metrics.IncrCounterWithLabels([]string{"secrets", "database", "backend", "connections", "count"}, float32(amount), labels) +} + // invalidateQueue cancels any background queue loading and destroys the queue. func (b *databaseBackend) invalidateQueue() { // cancel context before grabbing lock to start closing any open connections @@ -311,6 +332,7 @@ func (b *databaseBackend) clearConnection(name string) error { if ok { // Ignore error here since the database client is always killed db.Close() + b.addConnectionsCounter(db.database, -1) delete(b.connections, name) } return nil @@ -331,6 +353,7 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) { // Ensure we are deleting the correct connection mapDB, ok := b.connections[db.name] if ok && db.id == mapDB.id { + b.addConnectionsCounter(db.database, -1) delete(b.connections, db.name) } }() @@ -339,18 +362,26 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) { // clean closes all connections from all database types // and cancels any rotation queue loading operation. +// It spawns goroutines to close the databases since these +// are not guaranteed to finish quickly. func (b *databaseBackend) clean(ctx context.Context) { - // invalidateQueue acquires it's own lock on the backend, removes queue, and + // invalidateQueue acquires its own lock on the backend, removes queue, and // terminates the background ticker b.invalidateQueue() b.Lock() defer b.Unlock() - for _, db := range b.connections { - db.Close() - } + // copy all connections so we can close asynchronously + connectionsCopy := b.connections b.connections = make(map[string]*dbPluginInstance) + + for _, db := range connectionsCopy { + go func(db *dbPluginInstance) { + b.addConnectionsCounter(db.database, -1) + db.Close() + }(db) + } } const backendHelp = ` diff --git a/builtin/logical/database/mocks_test.go b/builtin/logical/database/mocks_test.go index 13eb53006142..71cc36f82abf 100644 --- a/builtin/logical/database/mocks_test.go +++ b/builtin/logical/database/mocks_test.go @@ -36,8 +36,7 @@ func (m *mockNewDatabase) DeleteUser(ctx context.Context, req v5.DeleteUserReque } func (m *mockNewDatabase) Type() (string, error) { - args := m.Called() - return args.String(0), args.Error(1) + return "mock", nil } func (m *mockNewDatabase) Close() error { From 389eba4c615e97d11bcf6de73e60adbeac4d32fe Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Thu, 9 Jun 2022 15:34:06 -0700 Subject: [PATCH 2/7] Set type of mock plugin in setupMockDB --- builtin/logical/database/mocks_test.go | 3 ++- builtin/logical/database/rotation_test.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/builtin/logical/database/mocks_test.go b/builtin/logical/database/mocks_test.go index 71cc36f82abf..56ec966458d1 100644 --- a/builtin/logical/database/mocks_test.go +++ b/builtin/logical/database/mocks_test.go @@ -36,7 +36,8 @@ func (m *mockNewDatabase) DeleteUser(ctx context.Context, req v5.DeleteUserReque } func (m *mockNewDatabase) Type() (string, error) { - return "mock", nil + args := m.Called() + return args.Get(0).(string), args.Error(1) } func (m *mockNewDatabase) Close() error { diff --git a/builtin/logical/database/rotation_test.go b/builtin/logical/database/rotation_test.go index 863f1a01e3c6..9f6e65f9b79e 100644 --- a/builtin/logical/database/rotation_test.go +++ b/builtin/logical/database/rotation_test.go @@ -1379,6 +1379,7 @@ func setupMockDB(b *databaseBackend) *mockNewDatabase { mockDB := &mockNewDatabase{} mockDB.On("Initialize", mock.Anything, mock.Anything).Return(v5.InitializeResponse{}, nil) mockDB.On("Close").Return(nil) + mockDB.On("Type").Return("mock", nil) dbw := databaseVersionWrapper{ v5: mockDB, } From de3c0e74b2f77fc736b133dc618d7c929aa55d8d Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Thu, 9 Jun 2022 16:05:59 -0700 Subject: [PATCH 3/7] Update builtin/logical/database/backend.go Co-authored-by: Calvin Leung Huang <1883212+calvn@users.noreply.github.com> --- builtin/logical/database/backend.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/builtin/logical/database/backend.go b/builtin/logical/database/backend.go index 97e014c946db..455a7028282a 100644 --- a/builtin/logical/database/backend.go +++ b/builtin/logical/database/backend.go @@ -289,19 +289,17 @@ func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name stri // addConnectionsCounter is used to keep metrics on how many and what types of databases we have open func (b *databaseBackend) addConnectionsCounter(dbw databaseVersionWrapper, amount int) { // keep track of what databases we open - labels := make([]metrics.Label, 0, 2) dbType, err := dbw.Type() - if err == nil { - labels = append(labels, metrics.Label{Name: "type", Value: dbType}) - } else { + if err != nil { b.Logger().Debug("Error getting database type", "err", err) - labels = append(labels, metrics.Label{Name: "type", Value: "unknown"}) - } + dbType = "unknown" + } + version := 5 if dbw.isV4() { - labels = append(labels, metrics.Label{Name: "version", Value: "4"}) - } else { - labels = append(labels, metrics.Label{Name: "version", Value: "5"}) + version = 4 } + + labels := []metrics.Label{{"type", dbType}, {"version", version}} metrics.IncrCounterWithLabels([]string{"secrets", "database", "backend", "connections", "count"}, float32(amount), labels) } From 32cd2649a2426e2131d88ba86ee23d98afb1c86f Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Thu, 9 Jun 2022 16:09:21 -0700 Subject: [PATCH 4/7] format --- builtin/logical/database/backend.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/builtin/logical/database/backend.go b/builtin/logical/database/backend.go index 455a7028282a..357c1cee1aa1 100644 --- a/builtin/logical/database/backend.go +++ b/builtin/logical/database/backend.go @@ -293,12 +293,12 @@ func (b *databaseBackend) addConnectionsCounter(dbw databaseVersionWrapper, amou if err != nil { b.Logger().Debug("Error getting database type", "err", err) dbType = "unknown" - } - version := 5 + } + version := "5" if dbw.isV4() { - version = 4 + version = "4" } - + labels := []metrics.Label{{"type", dbType}, {"version", version}} metrics.IncrCounterWithLabels([]string{"secrets", "database", "backend", "connections", "count"}, float32(amount), labels) } From 8d0c9be60d47a9d6f49d056cd95d4b08c7d49826 Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Fri, 10 Jun 2022 11:48:32 -0700 Subject: [PATCH 5/7] Add tests Use a semaphore to attempt to wait for the backend plugins to finish, but don't wait too long. We can't use a WaitGroup (easily) because it doesn't allow us to pass a context or specify a tiemout. --- builtin/logical/database/backend.go | 53 ++++++++++++++-- builtin/logical/database/backend_test.go | 80 ++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 4 deletions(-) diff --git a/builtin/logical/database/backend.go b/builtin/logical/database/backend.go index 357c1cee1aa1..a1a6ab8e6480 100644 --- a/builtin/logical/database/backend.go +++ b/builtin/logical/database/backend.go @@ -6,6 +6,7 @@ import ( "net/rpc" "strings" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -19,6 +20,7 @@ import ( "github.com/hashicorp/vault/sdk/helper/locksutil" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/queue" + "golang.org/x/sync/semaphore" ) const ( @@ -28,6 +30,32 @@ const ( minRootCredRollbackAge = 1 * time.Minute ) +// mutable for tests only +var cleanupMaxWaitTime = 500 * time.Millisecond + +// metrics collection +var ( + gaugeSync = sync.Mutex{} + gauges = map[string]*int32{} + gaugeKey = []string{"secrets", "database", "backend", "connections", "count"} +) + +func createGauge(name string) { + gaugeSync.Lock() + defer gaugeSync.Unlock() + gauges[name] = new(int32) +} + +func addToGauge(dbType, version string, amount int32) { + labels := []metrics.Label{{"type", dbType}, {"version", version}} + gaugeName := fmt.Sprintf("%s-%s", dbType, version) + if _, ok := gauges[gaugeName]; !ok { + createGauge(gaugeName) + } + val := atomic.AddInt32(gauges[gaugeName], int32(amount)) + metrics.SetGaugeWithLabels(gaugeKey, float32(val), labels) +} + type dbPluginInstance struct { sync.RWMutex database databaseVersionWrapper @@ -298,9 +326,7 @@ func (b *databaseBackend) addConnectionsCounter(dbw databaseVersionWrapper, amou if dbw.isV4() { version = "4" } - - labels := []metrics.Label{{"type", dbType}, {"version", version}} - metrics.IncrCounterWithLabels([]string{"secrets", "database", "backend", "connections", "count"}, float32(amount), labels) + addToGauge(dbType, version, int32(amount)) } // invalidateQueue cancels any background queue loading and destroys the queue. @@ -363,6 +389,9 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) { // It spawns goroutines to close the databases since these // are not guaranteed to finish quickly. func (b *databaseBackend) clean(ctx context.Context) { + cleanupCtx, cancel := context.WithDeadline(ctx, time.Now().Add(cleanupMaxWaitTime)) + defer cancel() + // invalidateQueue acquires its own lock on the backend, removes queue, and // terminates the background ticker b.invalidateQueue() @@ -374,12 +403,28 @@ func (b *databaseBackend) clean(ctx context.Context) { connectionsCopy := b.connections b.connections = make(map[string]*dbPluginInstance) + // we will try to wait for all the connections to close + // ... but not too long + sem := semaphore.NewWeighted(int64(len(connectionsCopy))) + err := sem.Acquire(cleanupCtx, int64(len(connectionsCopy))) + if err != nil { + b.Logger().Debug("Error acquiring semaphore; ignoring", "error", err) + } + for _, db := range connectionsCopy { go func(db *dbPluginInstance) { + defer sem.Release(1) b.addConnectionsCounter(db.database, -1) - db.Close() + err := db.Close() + if err != nil { + b.Logger().Debug("Error closing database while cleaning up plugin; ignoring", "error", err) + } }(db) } + err = sem.Acquire(cleanupCtx, int64(len(connectionsCopy))) + if err != nil { + b.Logger().Debug("Error in cleanup semaphore; ignoring", "error", err) + } } const backendHelp = ` diff --git a/builtin/logical/database/backend_test.go b/builtin/logical/database/backend_test.go index cf700c6d71fb..085789bd60d3 100644 --- a/builtin/logical/database/backend_test.go +++ b/builtin/logical/database/backend_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/vault/plugins/database/mongodb" "github.com/hashicorp/vault/plugins/database/postgresql" v4 "github.com/hashicorp/vault/sdk/database/dbplugin" + "github.com/hashicorp/vault/sdk/database/dbplugin/v5" v5 "github.com/hashicorp/vault/sdk/database/dbplugin/v5" "github.com/hashicorp/vault/sdk/database/helper/dbutil" "github.com/hashicorp/vault/sdk/framework" @@ -1461,6 +1462,85 @@ func TestBackend_ConnectionURL_redacted(t *testing.T) { } } +type hangingPlugin struct{} + +func (h hangingPlugin) Initialize(ctx context.Context, req v5.InitializeRequest) (v5.InitializeResponse, error) { + return v5.InitializeResponse{ + Config: req.Config, + }, nil +} + +func (h hangingPlugin) NewUser(ctx context.Context, req v5.NewUserRequest) (v5.NewUserResponse, error) { + return v5.NewUserResponse{}, nil +} + +func (h hangingPlugin) UpdateUser(ctx context.Context, req v5.UpdateUserRequest) (v5.UpdateUserResponse, error) { + return v5.UpdateUserResponse{}, nil +} + +func (h hangingPlugin) DeleteUser(ctx context.Context, req v5.DeleteUserRequest) (v5.DeleteUserResponse, error) { + return v5.DeleteUserResponse{}, nil +} + +func (h hangingPlugin) Type() (string, error) { + return "hanging", nil +} + +func (h hangingPlugin) Close() error { + time.Sleep(1000 * time.Second) + return nil +} + +var _ dbplugin.Database = (*hangingPlugin)(nil) + +func TestBackend_PluginMain_Hanging(t *testing.T) { + v5.Serve(&hangingPlugin{}) +} + +func TestBackend_Closes_Cleanly_Even_If_Plugin_Hangs(t *testing.T) { + cleanupMaxWaitTime = 100 * time.Millisecond + cluster, sys := getCluster(t) + vault.TestAddTestPlugin(t, cluster.Cores[0].Core, "hanging-plugin", consts.PluginTypeDatabase, "TestBackend_PluginMain_Hanging", []string{}, "") + t.Cleanup(cluster.Cleanup) + + config := logical.TestBackendConfig() + config.StorageView = &logical.InmemStorage{} + config.System = sys + + b, err := Factory(context.Background(), config) + if err != nil { + t.Fatal(err) + } + + // Configure a connection + data := map[string]interface{}{ + "connection_url": "doesn't matter", + "plugin_name": "hanging-plugin", + "allowed_roles": []string{"plugin-role-test"}, + } + req := &logical.Request{ + Operation: logical.UpdateOperation, + Path: "config/hang", + Storage: config.StorageView, + Data: data, + } + _, err = b.HandleRequest(namespace.RootContext(nil), req) + if err != nil { + t.Fatalf("err: %v", err) + } + timeout := time.NewTimer(750 * time.Millisecond) + done := make(chan bool) + go func() { + b.Cleanup(context.Background()) + done <- true + }() + select { + case <-timeout.C: + t.Error("Hanging plugin caused Close() to time out") + case <-done: + } +} + func testCredsExist(t *testing.T, resp *logical.Response, connURL string) bool { t.Helper() var d struct { From e198f86b5e070557118a02e849a0e1e65f15c6ed Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Fri, 10 Jun 2022 12:11:54 -0700 Subject: [PATCH 6/7] Remove unnecessary type conversion --- builtin/logical/database/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/builtin/logical/database/backend.go b/builtin/logical/database/backend.go index a1a6ab8e6480..f98a1dda1445 100644 --- a/builtin/logical/database/backend.go +++ b/builtin/logical/database/backend.go @@ -52,7 +52,7 @@ func addToGauge(dbType, version string, amount int32) { if _, ok := gauges[gaugeName]; !ok { createGauge(gaugeName) } - val := atomic.AddInt32(gauges[gaugeName], int32(amount)) + val := atomic.AddInt32(gauges[gaugeName], amount) metrics.SetGaugeWithLabels(gaugeKey, float32(val), labels) } From 2b915944f1807a68609af6792fd2d760eab77c06 Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Fri, 10 Jun 2022 14:18:40 -0700 Subject: [PATCH 7/7] Prevent hanging plugin from being run as a normal test --- builtin/logical/database/backend_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/builtin/logical/database/backend_test.go b/builtin/logical/database/backend_test.go index 085789bd60d3..cc82e7812262 100644 --- a/builtin/logical/database/backend_test.go +++ b/builtin/logical/database/backend_test.go @@ -1494,6 +1494,9 @@ func (h hangingPlugin) Close() error { var _ dbplugin.Database = (*hangingPlugin)(nil) func TestBackend_PluginMain_Hanging(t *testing.T) { + if os.Getenv(pluginutil.PluginVaultVersionEnv) == "" { + return + } v5.Serve(&hangingPlugin{}) }