diff --git a/base/bucket.go b/base/bucket.go index dfe23ae88a..d49acb513d 100644 --- a/base/bucket.go +++ b/base/bucket.go @@ -30,11 +30,6 @@ import ( pkgerrors "github.com/pkg/errors" ) -const ( - TapFeedType = "tap" - DcpFeedType = "dcp" -) - const ( DefaultPool = "default" ) @@ -92,12 +87,6 @@ func GetBaseDataStore(ds DataStore) DataStore { return ds } -// AsDataStoreName is a temporary thing until DataStoreName is implemented on wrappers (pending further design work on FQName...) -func AsDataStoreName(ds DataStore) (sgbucket.DataStoreName, bool) { - dsn, ok := GetBaseDataStore(ds).(sgbucket.DataStoreName) - return dsn, ok -} - func init() { // Increase max memcached request size to 20M bytes, to support large docs (attachments!) // arriving in a tap feed. (see issues #210, #333, #342) @@ -372,24 +361,6 @@ func IsCasMismatch(err error) bool { return false } -// Returns mutation feed type for bucket. Will first return the feed type from the spec, when present. If not found, returns default feed type for bucket -// (DCP for any couchbase bucket, TAP otherwise) -func GetFeedType(bucket Bucket) (feedType string) { - switch typedBucket := bucket.(type) { - case *GocbV2Bucket: - return DcpFeedType - case sgbucket.MutationFeedStore2: - return string(typedBucket.GetFeedType()) - case *LeakyBucket: - return GetFeedType(typedBucket.bucket) - case *TestBucket: - return GetFeedType(typedBucket.Bucket) - default: - // unknown bucket type? - return TapFeedType - } -} - // Gets the bucket max TTL, or 0 if no TTL was set. Sync gateway should fail to bring the DB online if this is non-zero, // since it's not meant to operate against buckets that auto-delete data. func getMaxTTL(ctx context.Context, store CouchbaseBucketStore) (int, error) { diff --git a/base/collection.go b/base/collection.go index 994c3d0735..475b0e2ab3 100644 --- a/base/collection.go +++ b/base/collection.go @@ -289,10 +289,6 @@ func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArgum return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID) } -func (b *GocbV2Bucket) StartTapFeed(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - return nil, errors.New("StartTapFeed not implemented") -} - func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) { agent, agentErr := b.getGoCBAgent() @@ -721,14 +717,3 @@ func (b *GocbV2Bucket) ServerMetrics(ctx context.Context) (map[string]*dto.Metri return mf, nil } - -func GetCollectionID(dataStore DataStore) uint32 { - switch c := dataStore.(type) { - case WrappingDatastore: - return GetCollectionID(c.GetUnderlyingDataStore()) - case sgbucket.Collection: - return c.GetCollectionID() - default: - return DefaultCollectionID - } -} diff --git a/base/leaky_bucket.go b/base/leaky_bucket.go index 988f86be3b..f34eda63a8 100644 --- a/base/leaky_bucket.go +++ b/base/leaky_bucket.go @@ -13,8 +13,6 @@ import ( "context" "expvar" "fmt" - "math" - "time" sgbucket "github.com/couchbase/sg-bucket" ) @@ -123,12 +121,7 @@ type LeakyBucketConfig struct { DDocDeleteErrorCount int DDocGetErrorCount int - // Emulate TAP/DCP feed de-dupliation behavior, such that within a - // window of # of mutations or a timeout, mutations for a given document - // will be filtered such that only the _latest_ mutation will make it through. - TapFeedDeDuplication bool - TapFeedVbuckets bool // Emulate vbucket numbers on feed - TapFeedMissingDocs []string // Emulate entry not appearing on tap feed + DCPFeedMissingDocs []string // Emulate entry not appearing on DCP feed ForceErrorSetRawKeys []string // Issuing a SetRaw call with a specified key will return an error @@ -161,209 +154,17 @@ type LeakyBucketConfig struct { IgnoreClose bool } -func (b *LeakyBucket) StartTapFeed(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - - if b.config.TapFeedDeDuplication { - return b.wrapFeedForDeduplication(args, dbStats) - } else if len(b.config.TapFeedMissingDocs) > 0 { - callback := func(event *sgbucket.FeedEvent) bool { - for _, key := range b.config.TapFeedMissingDocs { +func (b *LeakyBucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error { + if len(b.config.DCPFeedMissingDocs) > 0 { + wrappedCallback := func(event sgbucket.FeedEvent) bool { + for _, key := range b.config.DCPFeedMissingDocs { if string(event.Key) == key { return false } } - return true - } - return b.wrapFeed(args, callback, dbStats) - } else if b.config.TapFeedVbuckets { - // kick off the wrapped sgbucket tap feed - walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats) - if err != nil { - return walrusTapFeed, err - } - // this is the sgbucket.MutationFeed impl we'll return to callers, which - // will add vbucket information - channel := make(chan sgbucket.FeedEvent, 10) - vbTapFeed := &wrappedTapFeedImpl{ - channel: channel, - wrappedTapFeed: walrusTapFeed, + return callback(event) } - go func() { - for event := range walrusTapFeed.Events() { - key := string(event.Key) - event.VbNo = uint16(sgbucket.VBHash(key, 1024)) - vbTapFeed.channel <- event - } - close(vbTapFeed.channel) - }() - return vbTapFeed, nil - - } else { - return b.bucket.StartTapFeed(args, dbStats) + return b.bucket.StartDCPFeed(ctx, args, wrappedCallback, dbStats) } - -} - -func (b *LeakyBucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error { return b.bucket.StartDCPFeed(ctx, args, callback, dbStats) } - -type EventUpdateFunc func(event *sgbucket.FeedEvent) bool - -func (b *LeakyBucket) wrapFeed(args sgbucket.FeedArguments, callback EventUpdateFunc, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - - // kick off the wrapped sgbucket tap feed - walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats) - if err != nil { - return walrusTapFeed, err - } - - // create an output channel - channel := make(chan sgbucket.FeedEvent, 10) - - // this is the sgbucket.MutationFeed impl we'll return to callers, which - // will have missing entries - wrapperFeed := &wrappedTapFeedImpl{ - channel: channel, - wrappedTapFeed: walrusTapFeed, - } - - go func() { - for event := range walrusTapFeed.Events() { - // Callback returns false if the event should be skipped - if callback(&event) { - wrapperFeed.channel <- event - } - } - close(wrapperFeed.channel) - }() - return wrapperFeed, nil -} - -func (b *LeakyBucket) wrapFeedForDeduplication(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - // create an output channel - // start a goroutine which reads off the sgbucket tap feed - // - de-duplicate certain events - // - puts them to output channel - - // the number of changes that it will buffer up before de-duplicating - deDuplicationWindowSize := 5 - - // the timeout window in milliseconds after which it will flush to output, even if - // the dedupe buffer has not filled up yet. - deDuplicationTimeoutMs := time.Millisecond * 1000 - - // kick off the wrapped sgbucket tap feed - walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats) - if err != nil { - return walrusTapFeed, err - } - - // create an output channel for de-duplicated events - channel := make(chan sgbucket.FeedEvent, 10) - - // this is the sgbucket.MutationFeed impl we'll return to callers, which - // will reead from the de-duplicated events channel - dupeTapFeed := &wrappedTapFeedImpl{ - channel: channel, - wrappedTapFeed: walrusTapFeed, - } - - go func() { - defer close(dupeTapFeed.channel) - // the buffer to hold tap events that are candidates for de-duplication - deDupeBuffer := []sgbucket.FeedEvent{} - - timer := time.NewTimer(math.MaxInt64) - for { - select { - case tapEvent, ok := <-walrusTapFeed.Events(): - if !ok { - // channel closed, goroutine is done - // dedupe and send what we currently have - dedupeAndForward(deDupeBuffer, channel) - return - } - deDupeBuffer = append(deDupeBuffer, tapEvent) - - // if we've collected enough, dedeupe and send what we have, - // and reset buffer. - if len(deDupeBuffer) >= deDuplicationWindowSize { - dedupeAndForward(deDupeBuffer, channel) - deDupeBuffer = []sgbucket.FeedEvent{} - } else { - _ = timer.Reset(deDuplicationTimeoutMs) - } - - case <-timer.C: - if len(deDupeBuffer) > 0 { - // give up on waiting for the buffer to fill up, - // de-dupe and send what we currently have - dedupeAndForward(deDupeBuffer, channel) - deDupeBuffer = []sgbucket.FeedEvent{} - } - } - } - - }() - return dupeTapFeed, nil -} - -// An implementation of a sgbucket tap feed that wraps -// tap events on the upstream tap feed to better emulate real world -// TAP/DCP behavior. -type wrappedTapFeedImpl struct { - channel chan sgbucket.FeedEvent - wrappedTapFeed sgbucket.MutationFeed -} - -func (feed *wrappedTapFeedImpl) Close() error { - return feed.wrappedTapFeed.Close() -} - -func (feed *wrappedTapFeedImpl) Events() <-chan sgbucket.FeedEvent { - return feed.channel -} - -func (feed *wrappedTapFeedImpl) WriteEvents() chan<- sgbucket.FeedEvent { - return feed.channel -} - -func dedupeAndForward(tapEvents []sgbucket.FeedEvent, destChannel chan<- sgbucket.FeedEvent) { - - deduped := dedupeTapEvents(tapEvents) - - for _, tapEvent := range deduped { - destChannel <- tapEvent - } - -} - -func dedupeTapEvents(tapEvents []sgbucket.FeedEvent) []sgbucket.FeedEvent { - - // For each document key, keep track of the latest seen tapEvent - // doc1 -> tapEvent with Seq=1 - // doc2 -> tapEvent with Seq=5 - // (if tapEvent with Seq=7 comes in for doc1, it will clobber existing) - latestTapEventPerKey := map[string]sgbucket.FeedEvent{} - - for _, tapEvent := range tapEvents { - key := string(tapEvent.Key) - latestTapEventPerKey[key] = tapEvent - } - - // Iterate over the original tapEvents, and only keep what - // is in latestTapEventPerKey, and discard all previous mutations - // of that doc. This will preserve the original - // sequence order as read off the feed. - deduped := []sgbucket.FeedEvent{} - for _, tapEvent := range tapEvents { - latestTapEventForKey := latestTapEventPerKey[string(tapEvent.Key)] - if tapEvent.Cas == latestTapEventForKey.Cas { - deduped = append(deduped, tapEvent) - } - } - - return deduped - -} diff --git a/base/leaky_bucket_test.go b/base/leaky_bucket_test.go deleted file mode 100644 index cf3519e94f..0000000000 --- a/base/leaky_bucket_test.go +++ /dev/null @@ -1,70 +0,0 @@ -/* -Copyright 2016-Present Couchbase, Inc. - -Use of this software is governed by the Business Source License included in -the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that -file, in accordance with the Business Source License, use of this software will -be governed by the Apache License, Version 2.0, included in the file -licenses/APL2.txt. -*/ - -package base - -import ( - "testing" - - sgbucket "github.com/couchbase/sg-bucket" - "github.com/stretchr/testify/assert" -) - -func TestDedupeTapEventsLaterSeqSameDoc(t *testing.T) { - - tapEvents := []sgbucket.FeedEvent{ - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc1"), - Value: []byte(`".."`), - Cas: 1, - }, - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc1"), - Value: []byte(`".."`), - Cas: 2, - }, - } - - deduped := dedupeTapEvents(tapEvents) - - // make sure that one was deduped - assert.Len(t, deduped, 1) - - // make sure the earlier event was deduped - dedupedEvent := deduped[0] - assert.True(t, dedupedEvent.Cas == 2) - -} - -func TestDedupeNoDedupeDifferentDocs(t *testing.T) { - - tapEvents := []sgbucket.FeedEvent{ - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc1"), - Value: []byte(`".."`), - Cas: 1, - }, - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc2"), - Value: []byte(`".."`), - Cas: 2, - }, - } - - deduped := dedupeTapEvents(tapEvents) - - // make sure that nothing was deduped - assert.True(t, len(deduped) == 2) - -} diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index ba1b581a43..b252d7cedf 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -67,12 +67,16 @@ func (lds *LeakyDataStore) GetName() string { return lds.dataStore.GetName() } +func (lds *LeakyDataStore) ScopeName() string { + return lds.dataStore.ScopeName() +} + +func (lds *LeakyDataStore) CollectionName() string { + return lds.dataStore.CollectionName() +} + func (lds *LeakyDataStore) GetCollectionID() uint32 { - if coll, ok := lds.dataStore.(sgbucket.Collection); ok { - return coll.GetCollectionID() - } else { - return DefaultCollectionID - } + return lds.dataStore.GetCollectionID() } func (lds *LeakyDataStore) Get(k string, rv interface{}) (cas uint64, err error) { diff --git a/base/main_test_bucket_pool.go b/base/main_test_bucket_pool.go index 8cab3f3c2d..5697f770ac 100644 --- a/base/main_test_bucket_pool.go +++ b/base/main_test_bucket_pool.go @@ -222,7 +222,7 @@ func (tbp *TestBucketPool) GetWalrusTestBucket(t testing.TB, url string) (b Buck } // Wrap Walrus buckets with a leaky bucket to support vbucket IDs on feed. - b = &LeakyBucket{bucket: walrusBucket, config: &LeakyBucketConfig{TapFeedVbuckets: true}} + b = &LeakyBucket{bucket: walrusBucket, config: &LeakyBucketConfig{}} ctx := bucketCtx(testCtx, b) tbp.Logf(ctx, "Creating new %s test bucket", typeName) diff --git a/db/change_cache_test.go b/db/change_cache_test.go index d1bfdf4be2..0d1afbfaf5 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -504,10 +504,6 @@ func WriteUserDirect(t *testing.T, db *Database, username string, sequence uint6 func WriteDirectWithKey(t *testing.T, db *Database, key string, channelArray []string, sequence uint64) { - if base.TestUseXattrs() { - panic(fmt.Sprintf("WriteDirectWithKey() cannot be used in tests that are xattr enabled")) - } - rev := "1-a" chanMap := make(map[string]*channels.ChannelRemoval, 10) @@ -521,8 +517,15 @@ func WriteDirectWithKey(t *testing.T, db *Database, key string, channelArray []s Channels: chanMap, TimeSaved: time.Now(), } + body := fmt.Sprintf(`{"key": "%s"}`, key) collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _ = collection.dataStore.Add(key, 0, Body{base.SyncPropertyName: syncData, "key": key}) + if base.TestUseXattrs() { + _, err := collection.dataStore.WriteWithXattrs(base.TestCtx(t), key, 0, 0, []byte(body), map[string][]byte{base.SyncXattrName: base.MustJSONMarshal(t, syncData)}, nil) + require.NoError(t, err) + } else { + _, err := collection.dataStore.Add(key, 0, Body{base.SyncPropertyName: syncData, "key": key}) + require.NoError(t, err) + } } // Create a document directly to the bucket with specific _sync metadata - used for @@ -1304,10 +1307,6 @@ func TestStopChangeCache(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyDCP) - if base.TestUseXattrs() { - t.Skip("This test does not work with XATTRs due to calling WriteDirect(). Skipping.") - } - // Setup short-wait cache to ensure cleanup goroutines fire often cacheOptions := DefaultCacheOptions() cacheOptions.CachePendingSeqMaxWait = 10 * time.Millisecond @@ -1316,7 +1315,7 @@ func TestStopChangeCache(t *testing.T) { // Use leaky bucket to have the tap feed 'lose' document 3 leakyConfig := base.LeakyBucketConfig{ - TapFeedMissingDocs: []string{"doc-3"}, + DCPFeedMissingDocs: []string{"doc-3"}, } db, ctx := setupTestLeakyDBWithCacheOptions(t, cacheOptions, leakyConfig) diff --git a/db/change_listener.go b/db/change_listener.go index a9041fb4ad..8316cdad54 100644 --- a/db/change_listener.go +++ b/db/change_listener.go @@ -13,7 +13,6 @@ package db import ( "context" "expvar" - "fmt" "math" "strings" "sync" @@ -77,18 +76,13 @@ func (listener *changeListener) Start(ctx context.Context, bucket base.Bucket, d // build the set of collections to be requested // Add the metadata collection first - metadataStoreName, ok := base.AsDataStoreName(metadataStore) - if !ok { - return fmt.Errorf("changeListener started with collections, but unable to retrieve metadata store name for %T", metadataStore) - } - metadataStoreFoundInScopes := false scopeArgs := make(map[string][]string) for scopeName, scope := range scopes { collections := make([]string, 0) for collectionName, _ := range scope.Collections { collections = append(collections, collectionName) - if scopeName == metadataStoreName.ScopeName() && collectionName == metadataStoreName.CollectionName() { + if scopeName == metadataStore.ScopeName() && collectionName == metadataStore.CollectionName() { metadataStoreFoundInScopes = true } } @@ -97,11 +91,11 @@ func (listener *changeListener) Start(ctx context.Context, bucket base.Bucket, d // If the metadataStore's collection isn't already present in the list of scopes, add it to the DCP scopes if !metadataStoreFoundInScopes { - _, ok = scopeArgs[metadataStoreName.ScopeName()] + _, ok := scopeArgs[metadataStore.ScopeName()] if !ok { - scopeArgs[metadataStoreName.ScopeName()] = []string{metadataStoreName.CollectionName()} + scopeArgs[metadataStore.ScopeName()] = []string{metadataStore.CollectionName()} } else { - scopeArgs[metadataStoreName.ScopeName()] = append(scopeArgs[metadataStoreName.ScopeName()], metadataStoreName.CollectionName()) + scopeArgs[metadataStore.ScopeName()] = append(scopeArgs[metadataStore.ScopeName()], metadataStore.CollectionName()) } } listener.FeedArgs.Scopes = scopeArgs @@ -118,38 +112,10 @@ func (listener *changeListener) StartMutationFeed(ctx context.Context, bucket ba } }() - // Uses DCP by default, unless TAP is explicitly specified - feedType := base.GetFeedType(bucket) - switch feedType { - case base.TapFeedType: - // TAP Feed - // TAP feed is a go-channel of Tap events served by the bucket. Start the feed, then - // start a goroutine to work the event channel, calling ProcessEvent for each event - var err error - listener.tapFeed, err = bucket.StartTapFeed(listener.FeedArgs, dbStats) - if err != nil { - return err - } - go func() { - defer func() { - if listener.FeedArgs.DoneChan != nil { - close(listener.FeedArgs.DoneChan) - } - }() - defer base.FatalPanicHandler() - defer listener.notifyStopping(ctx) - for event := range listener.tapFeed.Events() { - event.TimeReceived = time.Now() - listener.ProcessFeedEvent(event) - } - }() - return nil - default: - // DCP Feed - // DCP receiver isn't go-channel based - DCPReceiver calls ProcessEvent directly. - base.InfofCtx(ctx, base.KeyDCP, "Using DCP feed for bucket: %q (based on feed_type specified in config file)", base.MD(bucket.GetName())) - return bucket.StartDCPFeed(ctx, listener.FeedArgs, listener.ProcessFeedEvent, dbStats) - } + // DCP Feed + // DCP receiver isn't go-channel based - DCPReceiver calls ProcessEvent directly. + base.InfofCtx(ctx, base.KeyDCP, "Using DCP feed for bucket: %q (based on feed_type specified in config file)", base.MD(bucket.GetName())) + return bucket.StartDCPFeed(ctx, listener.FeedArgs, listener.ProcessFeedEvent, dbStats) } // ProcessFeedEvent is invoked for each mutate or delete event seen on the server's mutation feed (TAP or DCP). Uses document @@ -278,15 +244,6 @@ func (listener *changeListener) NotifyCheckForTermination(ctx context.Context, k listener.tapNotifier.L.Unlock() } -func (listener *changeListener) notifyStopping(ctx context.Context) { - listener.tapNotifier.L.Lock() - listener.counter = 0 - listener.keyCounts = map[string]uint64{} - base.DebugfCtx(ctx, base.KeyChanges, "Notifying that changeListener is stopping") - listener.tapNotifier.Broadcast() - listener.tapNotifier.L.Unlock() -} - // Waits until either the counter, or terminateCheckCounter exceeds the given value. Returns the new counters. func (listener *changeListener) Wait(ctx context.Context, keys []string, counter uint64, terminateCheckCounter uint64) (uint64, uint64) { listener.tapNotifier.L.Lock() diff --git a/db/crud.go b/db/crud.go index 57925d6f60..ac003a0dfb 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1607,7 +1607,7 @@ func (db *DatabaseContext) assignSequence(ctx context.Context, docSequence uint6 doc.Sequence = docSequence doc.UnusedSequences = unusedSequences - // The server TAP/DCP feed will deduplicate multiple revisions for the same doc if they occur in + // The server DCP feed will deduplicate multiple revisions for the same doc if they occur in // the same mutation queue processing window. This results in missing sequences on the change listener. // To account for this, we track the recent sequence numbers for the document. if doc.RecentSequences == nil { diff --git a/db/database.go b/db/database.go index c6df802c70..8508e4bbf3 100644 --- a/db/database.go +++ b/db/database.go @@ -747,14 +747,7 @@ func (dbCtx *DatabaseContext) RemoveObsoleteIndexes(ctx context.Context, preview var errs *base.MultiError var removedIndexes []string for _, dataStore := range dbCtx.getDataStores() { - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Sprintf("Cannot get datastore name from %s", dataStore) - base.WarnfCtx(ctx, err) - errs = errs.Append(errors.New(err)) - continue - } - collectionName := fmt.Sprintf("`%s`.`%s`", dsName.ScopeName(), dsName.CollectionName()) + collectionName := fmt.Sprintf("`%s`.`%s`", dataStore.ScopeName(), dataStore.CollectionName()) n1qlStore, ok := base.AsN1QLStore(dataStore) if !ok { err := fmt.Sprintf("Cannot remove obsolete indexes for non-gocb collection %s - skipping.", base.MD(collectionName)) diff --git a/db/database_collection.go b/db/database_collection.go index 75f4248ca2..f095495f9c 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -43,19 +43,14 @@ func newDatabaseCollection(ctx context.Context, dbContext *DatabaseContext, data dataStore: dataStore, dbCtx: dbContext, collectionStats: stats, + ScopeName: dataStore.ScopeName(), + Name: dataStore.CollectionName(), } dbCollection.revisionCache = NewRevisionCache( dbContext.Options.RevisionCacheOptions, dbCollection, dbContext.DbStats.Cache(), ) - if metadataStoreName, ok := base.AsDataStoreName(dataStore); ok { - dbCollection.ScopeName = metadataStoreName.ScopeName() - dbCollection.Name = metadataStoreName.CollectionName() - } else { - dbCollection.ScopeName = base.DefaultScope - dbCollection.Name = base.DefaultCollection - } return dbCollection, nil } @@ -141,8 +136,7 @@ func (c *DatabaseCollection) exitChanges() chan struct{} { // GetCollectionID returns a collectionID. If couchbase server does not return collections, it will return base.DefaultCollectionID, like the default collection for a Couchbase Server that does support collections. func (c *DatabaseCollection) GetCollectionID() uint32 { - ds := base.GetBaseDataStore(c.dataStore) - return base.GetCollectionID(ds) + return c.dataStore.GetCollectionID() } // GetRevisionCacheForTest allow accessing a copy of revision cache. diff --git a/db/database_test.go b/db/database_test.go index f64edb8e98..97c9330678 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -3025,9 +3025,6 @@ func TestGetDatabaseCollectionWithUserDefaultCollection(t *testing.T) { require.NoError(t, err) require.NotNil(t, ds) - dataStoreName, ok := base.AsDataStoreName(ds) - require.True(t, ok) - testCases := []struct { name string scope string @@ -3042,9 +3039,9 @@ func TestGetDatabaseCollectionWithUserDefaultCollection(t *testing.T) { err: false, options: DatabaseContextOptions{ Scopes: map[string]ScopeOptions{ - dataStoreName.ScopeName(): ScopeOptions{ + ds.ScopeName(): ScopeOptions{ Collections: map[string]CollectionOptions{ - dataStoreName.CollectionName(): {}, + ds.CollectionName(): {}, }, }, base.DefaultScope: ScopeOptions{ diff --git a/db/indextest/main_test.go b/db/indextest/main_test.go index dde193a570..0290ebc43b 100644 --- a/db/indextest/main_test.go +++ b/db/indextest/main_test.go @@ -81,12 +81,6 @@ var primaryIndexReadier base.TBPBucketReadierFunc = func(ctx context.Context, b if err != nil { return err } - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Errorf("Could not determine datastore name from datastore: %+v", dataStore) - tbp.Logf(ctx, "%s", err) - return err - } tbp.Logf(ctx, "dropping existing bucket indexes") if err := db.EmptyPrimaryIndex(ctx, dataStore); err != nil { @@ -104,7 +98,7 @@ var primaryIndexReadier base.TBPBucketReadierFunc = func(ctx context.Context, b if len(indexes) != 1 && indexes[0] != base.PrimaryIndexName { return fmt.Errorf("expected only primary index to be present, found: %v", indexes) } - tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dsName.ScopeName(), dsName.CollectionName()) + tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dataStore.ScopeName(), dataStore.CollectionName()) // wait for primary index to be empty if err := db.WaitForPrimaryIndexEmpty(ctx, n1qlStore); err != nil { tbp.Logf(ctx, "waitForPrimaryIndexEmpty returned an error: %v", err) diff --git a/db/util_testing.go b/db/util_testing.go index 336943206e..d7a1fab4e3 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -301,12 +301,6 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex if err != nil { return err } - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Errorf("Could not determine datastore name from datastore: %+v", dataStore) - tbp.Logf(ctx, "%s", err) - return err - } if _, err := emptyAllDocsIndex(ctx, dataStore, tbp); err != nil { return err } @@ -317,7 +311,7 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex if !ok { return errors.New("attempting to empty indexes with non-N1QL store") } - tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dsName.ScopeName(), dsName.CollectionName()) + tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dataStore.ScopeName(), dataStore.CollectionName()) // we can't init indexes concurrently, so we'll just wait for them to be empty after emptying instead of recreating. if err := WaitForPrimaryIndexEmpty(ctx, n1qlStore); err != nil { tbp.Logf(ctx, "waitForPrimaryIndexEmpty returned an error: %v", err) @@ -390,13 +384,7 @@ var viewsAndGSIBucketInit base.TBPBucketInitFunc = func(ctx context.Context, b b Serverless: false, MetadataIndexes: IndexesWithoutMetadata, } - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Errorf("Could not determine datastore name from datastore: %+v", dataStore) - tbp.Logf(ctx, "%s", err) - return err - } - if base.IsDefaultCollection(dsName.ScopeName(), dsName.CollectionName()) { + if base.IsDefaultCollection(dataStore.ScopeName(), dataStore.CollectionName()) { options.MetadataIndexes = IndexesAll } if err := InitializeIndexes(ctx, n1qlStore, options); err != nil { diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 644eecc46b..034430dce4 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -1273,12 +1273,11 @@ Database: db_state_changed: $ref: '#/Event-config' feed_type: - description: The type of feed to use to communicate with Couchbase Server. + description: The type of feed to use to communicate with Couchbase Server. This will use DCP regardless of specification. type: string default: DCP enum: - DCP - - TAP deprecated: true allow_empty_password: description: This controls whether users that are created can have an empty password or not. diff --git a/go.mod b/go.mod index 89a0bb7316..46921ec62f 100644 --- a/go.mod +++ b/go.mod @@ -13,10 +13,10 @@ require ( github.com/couchbase/gocbcore/v10 v10.3.1 github.com/couchbase/gomemcached v0.2.1 github.com/couchbase/goutils v0.1.2 - github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27 + github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338 github.com/couchbaselabs/gocbconnstr v1.0.5 - github.com/couchbaselabs/rosmar v0.0.0-20240326232309-04dfb3337b60 + github.com/couchbaselabs/rosmar v0.0.0-20240404180245-795e6df684f3 github.com/elastic/gosigar v0.14.3 github.com/felixge/fgprof v0.9.3 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index c2987f2e64..c59f8ba9ad 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/couchbase/goutils v0.1.2 h1:gWr8B6XNWPIhfalHNog3qQKfGiYyh4K4VhO3P2o9B github.com/couchbase/goutils v0.1.2/go.mod h1:h89Ek/tiOxxqjz30nPPlwZdQbdB8BwgnuBxeoUe/ViE= github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27 h1:FGNvJsAQk6JZzuVXvoLXcoSQzOnQxWkywzYJFQqzXEg= github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= +github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 h1:kfWMYvUgSg2yIZJx+t63Ucl+zorvFqlYayXPkiXFtSE= +github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= github.com/couchbase/tools-common/cloud v1.0.0 h1:SQZIccXoedbrThehc/r9BJbpi/JhwJ8X00PDjZ2gEBE= github.com/couchbase/tools-common/cloud v1.0.0/go.mod h1:6KVlRpbcnDWrvickUJ+xpqCWx1vgYYlEli/zL4xmZAg= github.com/couchbase/tools-common/fs v1.0.0 h1:HFA4xCF/r3BtZShFJUxzVvGuXtDkqGnaPzYJP3Kp1mw= @@ -74,6 +76,8 @@ github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131 h1:2E github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= github.com/couchbaselabs/rosmar v0.0.0-20240326232309-04dfb3337b60 h1:w9E8CEvQia8BPA+2Ai6dJh64wYTmxNUrXNPkKhtPpGw= github.com/couchbaselabs/rosmar v0.0.0-20240326232309-04dfb3337b60/go.mod h1:MnlZ8BXE9Z7rUQEyb069P/6E9+YVkUxcqW5cmN23h0I= +github.com/couchbaselabs/rosmar v0.0.0-20240404180245-795e6df684f3 h1:AUvojYsPc2WgiO9xRalRQLyXzooRRWemdEkiGl+PZno= +github.com/couchbaselabs/rosmar v0.0.0-20240404180245-795e6df684f3/go.mod h1:SM0w4YHwXFMIyfqUbkpXZNWwAQKLwsUH91fsKUooMqw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index 48b0a3a524..601e8a3012 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -984,15 +984,12 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) { dataStore1, err := rt.TestBucket.GetNamedDataStore(0) require.NoError(t, err) - dataStore1Name, ok := base.AsDataStoreName(dataStore1) - require.True(t, ok) - // Run resync for single collection // Request body {"scopes": "scopeName": ["collection1Name", "collection2Name"]}} body := fmt.Sprintf(`{ "scopes" :{ "%s": ["%s"] } - }`, dataStore1Name.ScopeName(), dataStore1Name.CollectionName()) + }`, dataStore1.ScopeName(), dataStore1.CollectionName()) resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", body) rest.RequireStatus(t, resp, http.StatusOK) resyncManagerStatus := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) diff --git a/rest/adminapitest/collections_admin_api_test.go b/rest/adminapitest/collections_admin_api_test.go index 2c08d48811..43d30ce4bc 100644 --- a/rest/adminapitest/collections_admin_api_test.go +++ b/rest/adminapitest/collections_admin_api_test.go @@ -46,14 +46,10 @@ func TestCollectionsSyncImportFunctions(t *testing.T) { dataStore1, err := tb.GetNamedDataStore(0) require.NoError(t, err) - dataStore1Name, ok := base.AsDataStoreName(dataStore1) - require.True(t, ok) dataStore2, err := tb.GetNamedDataStore(1) require.NoError(t, err) - dataStore2Name, ok := base.AsDataStoreName(dataStore2) - require.True(t, ok) - keyspace1 := fmt.Sprintf("%s.%s.%s", "db", dataStore1Name.ScopeName(), dataStore1Name.CollectionName()) - keyspace2 := fmt.Sprintf("%s.%s.%s", "db", dataStore2Name.ScopeName(), dataStore2Name.CollectionName()) + keyspace1 := fmt.Sprintf("%s.%s.%s", "db", dataStore1.ScopeName(), dataStore1.CollectionName()) + keyspace2 := fmt.Sprintf("%s.%s.%s", "db", dataStore2.ScopeName(), dataStore2.CollectionName()) bucketConfig := fmt.Sprintf( `{"bucket": "%s", @@ -74,9 +70,9 @@ func TestCollectionsSyncImportFunctions(t *testing.T) { "num_index_replicas": 0, "enable_shared_bucket_access": true, "use_views": false}`, - tb.GetName(), dataStore1Name.ScopeName(), dataStore1Name.CollectionName(), + tb.GetName(), dataStore1.ScopeName(), dataStore1.CollectionName(), importFilter1, - syncFunction1, dataStore2Name.CollectionName(), + syncFunction1, dataStore2.CollectionName(), importFilter2, syncFunction2, ) diff --git a/rest/api_collections_test.go b/rest/api_collections_test.go index 2666207527..fc7a3e34b8 100644 --- a/rest/api_collections_test.go +++ b/rest/api_collections_test.go @@ -45,8 +45,6 @@ func TestCollectionsPutDocInKeyspace(t *testing.T) { defer rt.Close() ds := rt.GetSingleDataStore() - dataStoreName, ok := base.AsDataStoreName(ds) - require.True(t, ok) tests := []struct { name string keyspace string @@ -60,17 +58,17 @@ func TestCollectionsPutDocInKeyspace(t *testing.T) { }, { name: "collection only", - keyspace: strings.Join([]string{dbName, dataStoreName.CollectionName()}, base.ScopeCollectionSeparator), + keyspace: strings.Join([]string{dbName, ds.CollectionName()}, base.ScopeCollectionSeparator), expectedStatus: http.StatusCreated, }, { name: "invalid collection", - keyspace: strings.Join([]string{dbName, dataStoreName.ScopeName(), "buzz"}, base.ScopeCollectionSeparator), + keyspace: strings.Join([]string{dbName, ds.ScopeName(), "buzz"}, base.ScopeCollectionSeparator), expectedStatus: http.StatusNotFound, }, { name: "invalid scope", - keyspace: strings.Join([]string{dbName, "buzz", dataStoreName.CollectionName()}, base.ScopeCollectionSeparator), + keyspace: strings.Join([]string{dbName, "buzz", ds.CollectionName()}, base.ScopeCollectionSeparator), expectedStatus: http.StatusNotFound, }, { diff --git a/rest/config.go b/rest/config.go index 33b8b998be..78ff575449 100644 --- a/rest/config.go +++ b/rest/config.go @@ -71,6 +71,8 @@ const ( DefaultUseTLSServer = true DefaultMinConfigFetchInterval = time.Second + + tapFeedType = "tap" ) // serverType indicates which type of HTTP server sync gateway is running @@ -152,7 +154,7 @@ type DbConfig struct { ImportFilter *string `json:"import_filter,omitempty"` // The import filter applied to import operations in the _default scope and collection ImportBackupOldRev *bool `json:"import_backup_old_rev,omitempty"` // Whether import should attempt to create a temporary backup of the previous revision body, when available. EventHandlers *EventHandlerConfig `json:"event_handlers,omitempty"` // Event handlers (webhook) - FeedType string `json:"feed_type,omitempty"` // Feed type - "DCP" or "TAP"; defaults based on Couchbase server version + FeedType string `json:"feed_type,omitempty"` // Feed type - "DCP" only, "TAP" is ignored AllowEmptyPassword *bool `json:"allow_empty_password,omitempty"` // Allow empty passwords? Defaults to false CacheConfig *CacheConfig `json:"cache,omitempty"` // Cache settings DeprecatedRevCacheSize *uint32 `json:"rev_cache_size,omitempty"` // Maximum number of revisions to store in the revision cache (deprecated, CBG-356) @@ -781,7 +783,7 @@ func (dbConfig *DbConfig) validateVersion(ctx context.Context, isEnterpriseEditi if err != nil { multiError = multiError.Append(err) } - if dbConfig.FeedType == base.TapFeedType && autoImportEnabled == true { + if dbConfig.FeedType == tapFeedType && autoImportEnabled == true { multiError = multiError.Append(fmt.Errorf("Invalid configuration for Sync Gw. TAP feed type can not be used with auto-import")) } diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 8882b9f898..13592893e9 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -7931,9 +7931,7 @@ func TestGroupIDReplications(t *testing.T) { if !rt.GetDatabase().OnlyDefaultCollection() { dataStore, err = activeBucket.GetNamedDataStore(0) require.NoError(t, err) - dsName, ok := base.AsDataStoreName(dataStore) - require.True(t, ok) - keyspace = fmt.Sprintf("/db.%s.%s/", dsName.ScopeName(), dsName.CollectionName()) + keyspace = fmt.Sprintf("/db.%s.%s/", dataStore.ScopeName(), dataStore.CollectionName()) } for groupNum, group := range groupIDs { channel := "chan" + group diff --git a/rest/server_context.go b/rest/server_context.go index e604ce3150..915891a1e2 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -718,11 +718,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config MetadataIndexes: indexInfo.indexSet, UseXattrs: config.UseXattrs(), } - dsName, ok := base.AsDataStoreName(ds) - if !ok { - return nil, fmt.Errorf("Could not get datastore name from %s", base.MD(ds.GetName())) - } - ctx := base.KeyspaceLogCtx(ctx, bucket.GetName(), dsName.ScopeName(), dsName.CollectionName()) + ctx := base.KeyspaceLogCtx(ctx, bucket.GetName(), ds.ScopeName(), ds.CollectionName()) indexErr := db.InitializeIndexes(ctx, n1qlStore, options) if indexErr != nil { return nil, indexErr @@ -1334,15 +1330,11 @@ func validateMetadataStore(ctx context.Context, metadataStore base.DataStore) er if err == nil { return nil } - metadataStoreName, ok := base.AsDataStoreName(metadataStore) - if ok { - keyspace := strings.Join([]string{metadataStore.GetName(), metadataStoreName.ScopeName(), metadataStoreName.CollectionName()}, base.ScopeCollectionSeparator) - if base.IsDefaultCollection(metadataStoreName.ScopeName(), metadataStoreName.CollectionName()) { - base.WarnfCtx(ctx, "_default._default has been deleted from the server for bucket %s, to recover recreate the bucket", metadataStore.GetName()) - } - return fmt.Errorf("metadata store %s does not exist on couchbase server: %w", base.MD(keyspace), err) + keyspace := strings.Join([]string{metadataStore.GetName(), metadataStore.ScopeName(), metadataStore.CollectionName()}, base.ScopeCollectionSeparator) + if base.IsDefaultCollection(metadataStore.ScopeName(), metadataStore.CollectionName()) { + base.WarnfCtx(ctx, "_default._default has been deleted from the server for bucket %s, to recover recreate the bucket", metadataStore.GetName()) } - return fmt.Errorf("metadata store %s does not exist on couchbase server: %w", base.MD(metadataStore.GetName()), err) + return fmt.Errorf("metadata store %s does not exist on couchbase server: %w", base.MD(keyspace), err) } // validateEventConfigOptions returns errors for all invalid event type options. diff --git a/rest/utilities_testing_test.go b/rest/utilities_testing_test.go index c7b426aaf8..f14816e65c 100644 --- a/rest/utilities_testing_test.go +++ b/rest/utilities_testing_test.go @@ -207,12 +207,8 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { dbOne := "dbone" bucket1Datastore1, err := rt.TestBucket.GetNamedDataStore(0) require.NoError(t, err) - bucket1Datastore1Name, ok := base.AsDataStoreName(bucket1Datastore1) - require.True(t, ok) bucket1Datastore2, err := rt.TestBucket.GetNamedDataStore(1) require.NoError(t, err) - bucket1Datastore2Name, ok := base.AsDataStoreName(bucket1Datastore2) - require.True(t, ok) resp := rt.CreateDatabase(dbOne, dbConfig) RequireStatus(t, resp, http.StatusCreated) testCases = []struct { @@ -236,11 +232,11 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { }, { input: "/{{.keyspace1}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1Name.ScopeName(), bucket1Datastore1Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1.ScopeName(), bucket1Datastore1.CollectionName()), }, { input: "/{{.keyspace2}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2Name.ScopeName(), bucket1Datastore2Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2.ScopeName(), bucket1Datastore2.CollectionName()), }, } for _, test := range testCases { @@ -266,12 +262,8 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { dbTwo := "dbtwo" bucket2Datastore1, err := rt.TestBucket.GetNamedDataStore(0) require.NoError(t, err) - bucket2Datastore1Name, ok := base.AsDataStoreName(bucket2Datastore1) - require.True(t, ok) bucket2Datastore2, err := rt.TestBucket.GetNamedDataStore(1) require.NoError(t, err) - bucket2Datastore2Name, ok := base.AsDataStoreName(bucket2Datastore2) - require.True(t, ok) resp = rt.CreateDatabase(dbTwo, dbConfig) RequireStatus(t, resp, http.StatusCreated) testCases = []struct { @@ -305,20 +297,20 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { }, { input: "/{{.db1keyspace1}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1Name.ScopeName(), bucket2Datastore1Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1.ScopeName(), bucket2Datastore1.CollectionName()), }, { input: "/{{.db1keyspace2}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2Name.ScopeName(), bucket2Datastore2Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2.ScopeName(), bucket2Datastore2.CollectionName()), }, { input: "/{{.db2keyspace1}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore1Name.ScopeName(), bucket2Datastore1Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore1.ScopeName(), bucket2Datastore1.CollectionName()), }, { input: "/{{.db2keyspace2}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore2Name.ScopeName(), bucket2Datastore2Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore2.ScopeName(), bucket2Datastore2.CollectionName()), }, } for _, test := range testCases {