From 5ef676e489623d7217afa53457fd67eda607e8a1 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 23 Oct 2023 17:16:03 -0400 Subject: [PATCH] CBG-3553 add bucket.Close() when opening the database (#6546) * CBG-3553 add bucket.Close() when opening the database - close the DatabaseContext if it's not managed by the database if it hasn't been added to - changed newSequenceAllocator to not leak a goroutine * Don't create db automatically * Improve error message * Don't close dbcontext if already registered * Remove test which did not test anything --- db/sequence_allocator.go | 7 +++++-- rest/admin_api.go | 2 +- rest/server_context.go | 23 ++++++++++++++++++++--- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/db/sequence_allocator.go b/db/sequence_allocator.go index 6fd06d6e0c..84c50d363d 100644 --- a/db/sequence_allocator.go +++ b/db/sequence_allocator.go @@ -71,11 +71,14 @@ func newSequenceAllocator(ctx context.Context, datastore base.DataStore, dbStats // The reserveNotify channel manages communication between the releaseSequenceMonitor goroutine and _reserveSequenceRange invocations. s.reserveNotify = make(chan struct{}, 1) + _, err := s.lastSequence(ctx) // just reads latest sequence from bucket + if err != nil { + return nil, err + } go func() { defer base.FatalPanicHandler() s.releaseSequenceMonitor(ctx) }() - _, err := s.lastSequence(ctx) // just reads latest sequence from bucket return s, err } @@ -152,7 +155,7 @@ func (s *sequenceAllocator) lastSequence(ctx context.Context) (uint64, error) { s.dbStats.SequenceGetCount.Add(1) last, err := s.getSequence() if err != nil { - base.WarnfCtx(ctx, "Error from Get in getSequence(): %v", err) + return 0, fmt.Errorf("Couldn't get sequence from bucket: %w", err) } return last, err } diff --git a/rest/admin_api.go b/rest/admin_api.go index 079c84cf28..f964030901 100644 --- a/rest/admin_api.go +++ b/rest/admin_api.go @@ -170,7 +170,7 @@ func (h *handler) handleCreateDB() error { if errors.Is(err, base.ErrAuthError) { return base.HTTPErrorf(http.StatusForbidden, "auth failure using provided bucket credentials for database %s", base.MD(config.Name)) } - return err + return base.HTTPErrorf(http.StatusInternalServerError, "couldn't load database: %v", err) } } diff --git a/rest/server_context.go b/rest/server_context.go index 995a154eb8..0d4f00f33c 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -498,7 +498,9 @@ func GetBucketSpec(ctx context.Context, config *DatabaseConfig, serverConfig *St // lock to see if it's already been added by another process. If so, returns either the // existing DatabaseContext or an error based on the useExisting flag. // Pass in a bucketFromBucketSpecFn to replace the default ConnectToBucket function. This will cause the failFast argument to be ignored -func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config DatabaseConfig, options getOrAddDatabaseConfigOptions) (*db.DatabaseContext, error) { +func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config DatabaseConfig, options getOrAddDatabaseConfigOptions) (dbcontext *db.DatabaseContext, returnedError error) { + var bucket base.Bucket + // Generate bucket spec and validate whether db already exists spec, err := GetBucketSpec(ctx, &config, sc.Config) if err != nil { @@ -509,6 +511,22 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config if dbName == "" { dbName = spec.BucketName } + defer func() { + if returnedError == nil { + return + } + // database exists in global map, management is deferred to REST api + _, dbRegistered := sc.databases_[dbName] + if dbRegistered { + return + } + if dbcontext != nil { + dbcontext.Close(ctx) // will close underlying bucket + } else if bucket != nil { + bucket.Close(ctx) + } + }() + if spec.Server == "" { spec.Server = sc.Config.Bootstrap.Server } @@ -549,7 +567,6 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config base.MD(dbName), base.MD(spec.BucketName), base.SD(base.DefaultPool), base.SD(spec.Server)) // the connectToBucketFn is used for testing seam - var bucket base.Bucket if options.connectToBucketFn != nil { // the connectToBucketFn is used for testing seam bucket, err = options.connectToBucketFn(ctx, spec, options.failFast) @@ -841,7 +858,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config contextOptions.BlipStatsReportingInterval = defaultBytesStatsReportingInterval.Milliseconds() // Create the DB Context - dbcontext, err := db.NewDatabaseContext(ctx, dbName, bucket, autoImport, contextOptions) + dbcontext, err = db.NewDatabaseContext(ctx, dbName, bucket, autoImport, contextOptions) if err != nil { return nil, err }