Skip to content

Commit

Permalink
GOCBC-1626: Handle operations after close gracefully
Browse files Browse the repository at this point in the history
Motivation
----------
At the moment if cluster close is called twice or operations are performed
after close then the SDK can panic.

Changes
-------
Add logic to connection managers to fast fail any ops when a provider
is fetched and the manager is closed.
Add logic to return error when connection managers are closed twice.
Add provider controllers which are used with helper functions
which increment wait groups belonging to the connection managers.
Update all public functions that perform ops to first call the
contoller helper to get a provider.
Add wait groups to connection managers and block on them on close.

Change-Id: I17d16de1fa6aebfad09b267aaaebe2424be73893
Reviewed-on: https://review.couchbase.org/c/gocb/+/210690
Reviewed-by: Charles Dixon <[email protected]>
Tested-by: Build Bot <[email protected]>
Reviewed-by: Dimitris Christodoulou <[email protected]>
  • Loading branch information
chvck committed Jun 7, 2024
1 parent 55c90ba commit 708603f
Show file tree
Hide file tree
Showing 56 changed files with 2,181 additions and 2,214 deletions.
128 changes: 97 additions & 31 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (b *Bucket) getQueryProvider() (queryProvider, error) {
}

func (b *Bucket) getQueryIndexProvider() (queryIndexProvider, error) {
if b.bootstrapError != nil {
return nil, b.bootstrapError
}

provider, err := b.connectionManager.getQueryIndexProvider()
if err != nil {
return nil, err
Expand All @@ -108,6 +112,10 @@ func (b *Bucket) getQueryIndexProvider() (queryIndexProvider, error) {
}

func (b *Bucket) getSearchProvider() (searchProvider, error) {
if b.bootstrapError != nil {
return nil, b.bootstrapError
}

provider, err := b.connectionManager.getSearchProvider()
if err != nil {
return nil, err
Expand Down Expand Up @@ -139,6 +147,10 @@ func (b *Bucket) getAnalyticsProvider() (analyticsProvider, error) {
}

func (b *Bucket) getEventingManagementProvider() (eventingManagementProvider, error) {
if b.bootstrapError != nil {
return nil, b.bootstrapError
}

provider, err := b.connectionManager.getEventingManagementProvider()
if err != nil {
return nil, err
Expand All @@ -147,24 +159,84 @@ func (b *Bucket) getEventingManagementProvider() (eventingManagementProvider, er
return provider, nil
}

func (b *Bucket) getViewProvider(bucketName string) (viewProvider, error) {
provider, err := b.connectionManager.getViewProvider(bucketName)
func (b *Bucket) getViewProvider() (viewProvider, error) {
if b.bootstrapError != nil {
return nil, b.bootstrapError
}

provider, err := b.connectionManager.getViewProvider(b.Name())
if err != nil {
return nil, err
}

return provider, nil
}

func (b *Bucket) getViewIndexProvider(bucketName string) (viewIndexProvider, error) {
provider, err := b.connectionManager.getViewIndexProvider(bucketName)
func (b *Bucket) getViewIndexProvider() (viewIndexProvider, error) {
if b.bootstrapError != nil {
return nil, b.bootstrapError
}

provider, err := b.connectionManager.getViewIndexProvider(b.Name())
if err != nil {
return nil, err
}

return provider, nil
}

func (b *Bucket) getCollectionsManagementProvider() (collectionsManagementProvider, error) {
if b.bootstrapError != nil {
return nil, b.bootstrapError
}

provider, err := b.connectionManager.getCollectionsManagementProvider(b.Name())
if err != nil {
return nil, err
}

return provider, nil
}

func (b *Bucket) getDiagnosticsProvider() (diagnosticsProvider, error) {
provider, err := b.connectionManager.getDiagnosticsProvider(b.Name())
if err != nil {
return nil, err
}

return provider, nil
}

func (b *Bucket) getWaitUntilReadyProvider() (waitUntilReadyProvider, error) {
provider, err := b.connectionManager.getWaitUntilReadyProvider(b.Name())
if err != nil {
return nil, err
}

return provider, nil
}

func (b *Bucket) diagnosticsController() *providerController[diagnosticsProvider] {
return &providerController[diagnosticsProvider]{
get: b.getDiagnosticsProvider,
opController: b.connectionManager,
}
}

func (b *Bucket) viewController() *providerController[viewProvider] {
return &providerController[viewProvider]{
get: b.getViewProvider,
opController: b.connectionManager,
}
}

func (b *Bucket) waitUntilReadyController() *providerController[waitUntilReadyProvider] {
return &providerController[waitUntilReadyProvider]{
get: b.getWaitUntilReadyProvider,
opController: b.connectionManager,
}
}

// Name returns the name of the bucket.
func (b *Bucket) Name() string {
return b.bucketName
Expand Down Expand Up @@ -193,17 +265,19 @@ func (b *Bucket) DefaultCollection() *Collection {
// ViewIndexes returns a ViewIndexManager instance for managing views.
func (b *Bucket) ViewIndexes() *ViewIndexManager {
return &ViewIndexManager{
getProvider: func() (viewIndexProvider, error) {
return b.getViewIndexProvider(b.Name())
controller: &providerController[viewIndexProvider]{
get: b.getViewIndexProvider,
opController: b.connectionManager,
},
}
}

// CollectionsV2 provides functions for managing collections.
func (b *Bucket) CollectionsV2() *CollectionManagerV2 {
return &CollectionManagerV2{
getProvider: func() (collectionsManagementProvider, error) {
return b.connectionManager.getCollectionsManagementProvider(b.Name())
controller: &providerController[collectionsManagementProvider]{
get: b.getCollectionsManagementProvider,
opController: b.connectionManager,
},
}
}
Expand All @@ -225,27 +299,19 @@ func (b *Bucket) Collections() *CollectionManager {
// Valid service types are: ServiceTypeKeyValue, ServiceTypeManagement, ServiceTypeQuery, ServiceTypeSearch,
// ServiceTypeAnalytics, ServiceTypeViews.
func (b *Bucket) WaitUntilReady(timeout time.Duration, opts *WaitUntilReadyOptions) error {
if opts == nil {
opts = &WaitUntilReadyOptions{}
}

if b.bootstrapError != nil {
return b.bootstrapError
}

provider, err := b.connectionManager.getWaitUntilReadyProvider(b.bucketName)
if err != nil {
return err
}

err = provider.WaitUntilReady(
opts.Context,
time.Now().Add(timeout),
opts,
)
if err != nil {
return err
}

return nil
return autoOpControlErrorOnly(b.waitUntilReadyController(), func(provider waitUntilReadyProvider) error {
if opts == nil {
opts = &WaitUntilReadyOptions{}
}

if b.bootstrapError != nil {
return b.bootstrapError
}

return provider.WaitUntilReady(
opts.Context,
time.Now().Add(timeout),
opts,
)
})
}
15 changes: 9 additions & 6 deletions bucket_collectionsmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,12 +1254,15 @@ func (suite *UnitTestSuite) runGetAllScopesMgmtRequestFailsTest(v2 bool) {
provider.On("executeMgmtRequest", nil, mock.AnythingOfType("gocb.mgmtRequest")).Return(nil, errors.New("http send failure"))

mgrV2 := CollectionManagerV2{
getProvider: func() (collectionsManagementProvider, error) {
return &collectionsManagementProviderCore{
mgmtProvider: provider,
tracer: &NoopTracer{},
meter: &meterWrapper{meter: &NoopMeter{}, isNoopMeter: true},
}, nil
controller: &providerController[collectionsManagementProvider]{
get: func() (collectionsManagementProvider, error) {
return &collectionsManagementProviderCore{
mgmtProvider: provider,
tracer: &NoopTracer{},
meter: &meterWrapper{meter: &NoopMeter{}, isNoopMeter: true},
}, nil
},
opController: mockOpController{},
},
}

Expand Down
Loading

0 comments on commit 708603f

Please sign in to comment.