From bc2497214194aea7b2a3cd724b4b64bccb1e1b80 Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Fri, 10 Jun 2022 16:19:15 -0700 Subject: [PATCH] Cleanup and simplify lock usage in database plugin Following up from discussions in #15923 and #15933, I wanted to split out a separate PR that drastically reduced the complexity of the use of the databaseBackend lock. We no longer need it at all for the `credRotationQueue`, and we can move it to be solely used in a few, small connections map management functions. --- builtin/logical/database/backend.go | 125 +++++++++--------- .../database/path_config_connection.go | 10 +- .../database/path_rotate_credentials.go | 6 +- builtin/logical/database/rotation.go | 21 ++- 4 files changed, 78 insertions(+), 84 deletions(-) diff --git a/builtin/logical/database/backend.go b/builtin/logical/database/backend.go index cd24943c2db8..c63eaccdb650 100644 --- a/builtin/logical/database/backend.go +++ b/builtin/logical/database/backend.go @@ -58,10 +58,9 @@ func Factory(ctx context.Context, conf *logical.BackendConfig) (logical.Backend, // Create a context with a cancel method for processing any WAL entries and // populating the queue initCtx := context.Background() - ictx, cancel := context.WithCancel(initCtx) - b.cancelQueue = cancel + b.ctx, b.cancelQueue = context.WithCancel(initCtx) // Load queue and kickoff new periodic ticker - go b.initQueue(ictx, conf, conf.System.ReplicationState()) + go b.initQueue(b.ctx, conf, conf.System.ReplicationState()) return b, nil } @@ -110,13 +109,14 @@ func Backend(conf *logical.BackendConfig) *databaseBackend { } type databaseBackend struct { + // used to synchronize access to the connections map + connLock sync.RWMutex // connections holds configured database connections by config name connections map[string]*dbPluginInstance logger log.Logger *framework.Backend - sync.RWMutex - // CredRotationQueue is an in-memory priority queue used to track Static Roles + // credRotationQueue is an in-memory priority queue used to track Static Roles // that require periodic rotation. Backends will have a PriorityQueue // initialized on setup, but only backends that are mounted by a primary // server or mounted as a local mount will perform the rotations. @@ -124,7 +124,9 @@ type databaseBackend struct { // cancelQueue is used to remove the priority queue and terminate the // background ticker. credRotationQueue *queue.PriorityQueue - cancelQueue context.CancelFunc + // context used for canceling operations + ctx context.Context + cancelQueue context.CancelFunc // roleLocks is used to lock modifications to roles in the queue, to ensure // concurrent requests are not modifying the same role and possibly causing @@ -132,6 +134,36 @@ type databaseBackend struct { roleLocks []*locksutil.LockEntry } +func (b *databaseBackend) connGet(name string) *dbPluginInstance { + b.connLock.RLock() + defer b.connLock.RUnlock() + return b.connections[name] +} + +func (b *databaseBackend) connPop(name string) *dbPluginInstance { + b.connLock.Lock() + defer b.connLock.Unlock() + dbi := b.connections[name] + delete(b.connections, name) + return dbi +} + +func (b *databaseBackend) connPut(name string, newDbi *dbPluginInstance) *dbPluginInstance { + b.connLock.Lock() + defer b.connLock.Unlock() + dbi := b.connections[name] + b.connections[name] = newDbi + return dbi +} + +func (b *databaseBackend) connClear() map[string]*dbPluginInstance { + b.connLock.Lock() + defer b.connLock.Unlock() + old := b.connections + b.connections = make(map[string]*dbPluginInstance) + return old +} + func (b *databaseBackend) DatabaseConfig(ctx context.Context, s logical.Storage, name string) (*DatabaseConfig, error) { entry, err := s.Get(ctx, fmt.Sprintf("config/%s", name)) if err != nil { @@ -236,22 +268,8 @@ func (b *databaseBackend) GetConnection(ctx context.Context, s logical.Storage, } func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name string, config *DatabaseConfig) (*dbPluginInstance, error) { - b.RLock() - unlockFunc := b.RUnlock - defer func() { unlockFunc() }() - - dbi, ok := b.connections[name] - if ok { - return dbi, nil - } - - // Upgrade lock - b.RUnlock() - b.Lock() - unlockFunc = b.Unlock - - dbi, ok = b.connections[name] - if ok { + dbi := b.connGet(name) + if dbi != nil { return dbi, nil } @@ -280,38 +298,23 @@ func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name stri id: id, name: name, } - b.connections[name] = dbi - return dbi, nil -} - -// invalidateQueue cancels any background queue loading and destroys the queue. -func (b *databaseBackend) invalidateQueue() { - // cancel context before grabbing lock to start closing any open connections - // this is safe to do without the lock since it is only written to once in initialization - // and can be canceled multiple times safely - if b.cancelQueue != nil { - b.cancelQueue() + oldConn := b.connPut(name, dbi) + if oldConn != nil { + err := oldConn.Close() + if err != nil { + b.Logger().Warn("Error closing database connection", "error", err) + } } - b.Lock() - defer b.Unlock() - - b.credRotationQueue = nil + return dbi, nil } // ClearConnection closes the database connection and // removes it from the b.connections map. func (b *databaseBackend) ClearConnection(name string) error { - b.Lock() - defer b.Unlock() - return b.clearConnection(name) -} - -func (b *databaseBackend) clearConnection(name string) error { - db, ok := b.connections[name] - if ok { + db := b.connPop(name) + if db != nil { // Ignore error here since the database client is always killed db.Close() - delete(b.connections, name) } return nil } @@ -324,14 +327,18 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) { // and simply defer the unlock. Since we are attaching the instance and matching // the id in the connection map, we can safely do this. go func() { - b.Lock() - defer b.Unlock() db.Close() // Ensure we are deleting the correct connection - mapDB, ok := b.connections[db.name] - if ok && db.id == mapDB.id { - delete(b.connections, db.name) + mapDB := b.connPop(db.name) + if mapDB != nil && db.id != mapDB.id { + // oops, put it back + oldDbi := b.connPut(db.name, mapDB) + if oldDbi != nil { + // there is a small chance that something else was inserted in that slot during that time + // if so, clean it up + oldDbi.Close() + } } }() } @@ -339,18 +346,14 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) { // clean closes all connections from all database types // and cancels any rotation queue loading operation. -func (b *databaseBackend) clean(ctx context.Context) { - // invalidateQueue acquires it's own lock on the backend, removes queue, and - // terminates the background ticker - b.invalidateQueue() +func (b *databaseBackend) clean(_ context.Context) { + // kill the queue and terminate the background ticker + b.cancelQueue() - b.Lock() - defer b.Unlock() - - for _, db := range b.connections { - db.Close() + connections := b.connClear() + for _, db := range connections { + go db.Close() } - b.connections = make(map[string]*dbPluginInstance) } const backendHelp = ` diff --git a/builtin/logical/database/path_config_connection.go b/builtin/logical/database/path_config_connection.go index ac5a623d7363..cfc61a98b822 100644 --- a/builtin/logical/database/path_config_connection.go +++ b/builtin/logical/database/path_config_connection.go @@ -344,16 +344,14 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc { b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName) - b.Lock() - defer b.Unlock() - // Close and remove the old connection - b.clearConnection(name) - - b.connections[name] = &dbPluginInstance{ + oldConn := b.connPut(name, &dbPluginInstance{ database: dbw, name: name, id: id, + }) + if oldConn != nil { + oldConn.Close() } err = storeConfig(ctx, req.Storage, name, config) diff --git a/builtin/logical/database/path_rotate_credentials.go b/builtin/logical/database/path_rotate_credentials.go index 2b197d59e780..60183059f1c9 100644 --- a/builtin/logical/database/path_rotate_credentials.go +++ b/builtin/logical/database/path_rotate_credentials.go @@ -78,10 +78,6 @@ func (b *databaseBackend) pathRotateRootCredentialsUpdate() framework.OperationF return nil, err } - // Take out the backend lock since we are swapping out the connection - b.Lock() - defer b.Unlock() - // Take the write lock on the instance dbi.Lock() defer dbi.Unlock() @@ -93,7 +89,7 @@ func (b *databaseBackend) pathRotateRootCredentialsUpdate() framework.OperationF b.Logger().Error("error closing the database plugin connection", "err", err) } // Even on error, still remove the connection - delete(b.connections, name) + b.ClearConnection(name) }() generator, err := newPasswordGenerator(nil) diff --git a/builtin/logical/database/rotation.go b/builtin/logical/database/rotation.go index 665f2e8b0e84..d9afd86c9dfc 100644 --- a/builtin/logical/database/rotation.go +++ b/builtin/logical/database/rotation.go @@ -642,14 +642,11 @@ func (b *databaseBackend) loadStaticWALs(ctx context.Context, s logical.Storage) // actually available. This is needed because both runTicker and initQueue // operate in go-routines, and could be accessing the queue concurrently func (b *databaseBackend) pushItem(item *queue.Item) error { - b.RLock() - unlockFunc := b.RUnlock - defer func() { unlockFunc() }() - - if b.credRotationQueue != nil { + select { + case <-b.ctx.Done(): + default: return b.credRotationQueue.Push(item) } - b.Logger().Warn("no queue found during push item") return nil } @@ -658,9 +655,9 @@ func (b *databaseBackend) pushItem(item *queue.Item) error { // actually available. This is needed because both runTicker and initQueue // operate in go-routines, and could be accessing the queue concurrently func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) { - b.RLock() - defer b.RUnlock() - if b.credRotationQueue != nil { + select { + case <-b.ctx.Done(): + default: return b.credRotationQueue.Pop() } return nil, queue.ErrEmpty @@ -670,9 +667,9 @@ func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) { // actually available. This is needed because both runTicker and initQueue // operate in go-routines, and could be accessing the queue concurrently func (b *databaseBackend) popFromRotationQueueByKey(name string) (*queue.Item, error) { - b.RLock() - defer b.RUnlock() - if b.credRotationQueue != nil { + select { + case <-b.ctx.Done(): + default: item, err := b.credRotationQueue.PopByKey(name) if err != nil { return nil, err