Skip to content

Commit

Permalink
CBG-3386 turn off failOnRollback for resync (#6463)
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin authored Sep 26, 2023
1 parent 408b47e commit 1a2b934
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
13 changes: 7 additions & 6 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface
base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against specified collections", resyncLoggingID)
}

clientOptions := getReSyncDCPClientOptions(collectionIDs, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID))
clientOptions := getResyncDCPClientOptions(collectionIDs, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID))

dcpFeedKey := generateResyncDCPStreamName(r.ResyncID)
dcpFeedKey := GenerateResyncDCPStreamName(r.ResyncID)
dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket)
if err != nil {
base.WarnfCtx(ctx, "[%s] Failed to create resync DCP client! %v", resyncLoggingID, err)
Expand Down Expand Up @@ -326,19 +326,20 @@ type ResyncManagerStatusDocDCP struct {
ResyncManagerMeta `json:"meta"`
}

// getReSyncDCPClientOptions returns the default set of DCPClientOptions suitable for resync
func getReSyncDCPClientOptions(collectionIDs []uint32, groupID string, prefix string) *base.DCPClientOptions {
// getResyncDCPClientOptions returns the default set of DCPClientOptions suitable for resync
func getResyncDCPClientOptions(collectionIDs []uint32, groupID string, prefix string) *base.DCPClientOptions {
return &base.DCPClientOptions{
OneShot: true,
FailOnRollback: true,
FailOnRollback: false,
MetadataStoreType: base.DCPMetadataStoreCS,
GroupID: groupID,
CollectionIDs: collectionIDs,
CheckpointPrefix: prefix,
}
}

func generateResyncDCPStreamName(resyncID string) string {
// GenerateResyncDCPStreamName returns the DCP stream name for a resync.
func GenerateResyncDCPStreamName(resyncID string) string {
return fmt.Sprintf(
"sg-%v:resync:%v",
base.ProductAPIVersion,
Expand Down
70 changes: 70 additions & 0 deletions rest/adminapitest/resync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package adminapitest

import (
"fmt"
"net/http"
"testing"

"github.com/couchbase/gocbcore/v10"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/couchbase/sync_gateway/rest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestResyncRollback ensures that we allow rollback of
func TestResyncRollback(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test doesn't works with walrus")
}
rt := rest.NewRestTester(t, &rest.RestTesterConfig{
SyncFn: `function(doc) { channel("x") }`, // use custom sync function to increment sync function counter
})

defer rt.Close()

numDocs := 10
for i := 0; i < numDocs; i++ {
rt.CreateDoc(t, fmt.Sprintf("doc%v", i))
}
assert.Equal(t, int64(numDocs), rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value())

response := rt.SendAdminRequest("POST", "/{{.db}}/_offline", "")
rest.RequireStatus(t, response, http.StatusOK)
require.NoError(t, rt.WaitForDBState(db.RunStateString[db.DBOffline]))

// we need to wait for the resync to start and not finish so we get a partial completion
resp := rt.SendAdminRequest("POST", "/{{.db}}/_resync", "")
rest.RequireStatus(t, resp, http.StatusOK)
_ = rt.WaitForResyncDCPStatus(db.BackgroundProcessStateRunning)

// immediately stop the resync process (we just need the status data to be persisted to the bucket), we are looking for partial completion
resp = rt.SendAdminRequest("POST", "/{{.db}}/_resync?action=stop", "")
rest.RequireStatus(t, resp, http.StatusOK)
status := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateStopped)
// make sure this hasn't accidentally completed
require.Equal(t, db.BackgroundProcessStateStopped, status.State)

// alter persisted dcp metadata from the first run to force a rollback
name := db.GenerateResyncDCPStreamName(status.ResyncID)
checkpointPrefix := fmt.Sprintf("%s:%v", rt.GetDatabase().MetadataKeys.DCPCheckpointPrefix(rt.GetDatabase().Options.GroupID), name)
meta := base.NewDCPMetadataCS(rt.Context(), rt.Bucket().DefaultDataStore(), 1024, 8, checkpointPrefix)
vbMeta := meta.GetMeta(0)
var garbageVBUUID gocbcore.VbUUID = 1234
vbMeta.VbUUID = garbageVBUUID
meta.SetMeta(0, vbMeta)
meta.Persist(rt.Context(), 0, []uint16{0})

response = rt.SendAdminRequest("POST", "/db/_resync?action=start", "")
rest.RequireStatus(t, response, http.StatusOK)
status = rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
}

0 comments on commit 1a2b934

Please sign in to comment.