Skip to content

Commit

Permalink
Make change stream restartable.
Browse files Browse the repository at this point in the history
This entails a small refactor of the change stream code so that the
change stream’s creation and iteration both happen under a retryer.
  • Loading branch information
FGasper committed Nov 23, 2024
1 parent 3c81778 commit 03853c7
Show file tree
Hide file tree
Showing 31 changed files with 3,474 additions and 39 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.28.0
github.com/samber/lo v1.47.0
github.com/samber/mo v1.13.0
github.com/stretchr/testify v1.8.0
github.com/urfave/cli v1.22.9
go.mongodb.org/mongo-driver v1.17.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/samber/mo v1.13.0 h1:LB1OwfJMju3a6FjghH+AIvzMG0ZPOzgTWj1qaHs1IQ4=
github.com/samber/mo v1.13.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
5 changes: 3 additions & 2 deletions internal/util/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
// `ErrorCode` newtype, but that requires a more invasive change to everything
// that uses error codes.
const (
LockFailed int = 107
SampleTooManyDuplicates int = 28799
LockFailed = 107
SampleTooManyDuplicates = 28799
CursorKilled = 237
)

//
Expand Down
119 changes: 84 additions & 35 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"time"

"github.com/10gen/migration-verifier/internal/keystring"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/util"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/samber/mo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
Expand Down Expand Up @@ -63,7 +66,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
for i, changeEvent := range batch {
if changeEvent.ClusterTime != nil &&
(verifier.lastChangeEventTime == nil ||
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
verifier.lastChangeEventTime.Before(*changeEvent.ClusterTime)) {
verifier.lastChangeEventTime = changeEvent.ClusterTime
}
switch changeEvent.OpType {
Expand Down Expand Up @@ -175,9 +178,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
return nil
}

func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
defer cs.Close(ctx)

func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) error {
var lastPersistedTime time.Time

persistResumeTokenIfNeeded := func() error {
Expand All @@ -201,10 +202,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha

// If the context is canceled, return immmediately.
case <-ctx.Done():
verifier.logger.Debug().
Err(ctx.Err()).
Msg("Change stream quitting.")
return
return ctx.Err()

// If the changeStreamEnderChan has a message, the user has indicated that
// source writes are ended. This means we should exit rather than continue
Expand All @@ -222,10 +220,11 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
var curTs primitive.Timestamp
curTs, err = extractTimestampFromResumeToken(cs.ResumeToken())
if err != nil {
err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
break
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
}

// writesOffTs never refers to a real event,
// so we can stop once curTs >= writesOffTs.
if !curTs.Before(writesOffTs) {
verifier.logger.Debug().
Interface("currentTimestamp", curTs).
Expand All @@ -238,7 +237,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)

if err != nil {
break
return err
}
}

Expand All @@ -248,17 +247,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
if err == nil {
err = persistResumeTokenIfNeeded()
}
}

if err != nil && !errors.Is(err, context.Canceled) {
verifier.logger.Debug().
Err(err).
Msg("Sending change stream error.")

verifier.changeStreamErrChan <- err

if !gotwritesOffTimestamp {
break
if err != nil {
return err
}
}

Expand All @@ -284,18 +275,21 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
}

infoLog.Msg("Change stream is done.")

return nil
}

// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
func (verifier *Verifier) createChangeStream(
ctx context.Context,
) (*mongo.ChangeStream, primitive.Timestamp, error) {
pipeline := verifier.GetChangeStreamFilter()
opts := options.ChangeStream().
SetMaxAwaitTime(1 * time.Second).
SetFullDocument(options.UpdateLookup)

savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
if err != nil {
return errors.Wrap(err, "failed to load persisted change stream resume token")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
}

csStartLogEvent := verifier.logger.Info()
Expand All @@ -322,40 +316,95 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {

sess, err := verifier.srcClient.StartSession()
if err != nil {
return errors.Wrap(err, "failed to start session")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
}
sctx := mongo.NewSessionContext(ctx, sess)
srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts)
if err != nil {
return errors.Wrap(err, "failed to open change stream")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
}

err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream)
if err != nil {
return err
return nil, primitive.Timestamp{}, err
}

csTimestamp, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
startTs, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
if err != nil {
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
}

// With sharded clusters the resume token might lead the cluster time
// by 1 increment. In that case we need the actual cluster time;
// otherwise we will get errors.
clusterTime, err := getClusterTimeFromSession(sess)
if err != nil {
return errors.Wrap(err, "failed to read cluster time from session")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
}

verifier.srcStartAtTs = &csTimestamp
if csTimestamp.After(clusterTime) {
verifier.srcStartAtTs = &clusterTime
if startTs.After(clusterTime) {
startTs = clusterTime
}

return srcChangeStream, startTs, nil
}

// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
// Result seems a bit simpler than messing with 2 separate channels.
resultChan := make(chan mo.Result[primitive.Timestamp])

go func() {
retryer := retry.New(retry.DefaultDurationLimit)
retryer = retryer.WithErrorCodes(util.CursorKilled)

parentThreadWaiting := true

err := retryer.
RunForTransientErrorsOnly(
ctx,
verifier.logger,
func(i *retry.Info) error {
srcChangeStream, startTs, err := verifier.createChangeStream(ctx)
if err != nil {
return err
}

defer srcChangeStream.Close(ctx)

if parentThreadWaiting {
resultChan <- mo.Ok(startTs)
close(resultChan)
parentThreadWaiting = false
}

return verifier.iterateChangeStream(ctx, srcChangeStream)
},
)

if err != nil {
if parentThreadWaiting {
resultChan <- mo.Err[primitive.Timestamp](err)
} else {
verifier.changeStreamErrChan <- err
close(verifier.changeStreamErrChan)
}
}
}()

result := <-resultChan

startTs, err := result.Get()
if err != nil {
return err
}

verifier.srcStartAtTs = &startTs

verifier.mux.Lock()
verifier.changeStreamRunning = true
verifier.mux.Unlock()

go verifier.iterateChangeStream(ctx, srcChangeStream)

return nil
}

Expand Down
88 changes: 88 additions & 0 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
)

func TestChangeStreamFilter(t *testing.T) {
Expand Down Expand Up @@ -247,6 +249,92 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
)
}

func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
ctx := suite.Context()

verifier := suite.BuildVerifier()

db := suite.srcMongoClient.Database(suite.DBNameForTest())
coll := db.Collection("mycoll")
suite.Require().NoError(
db.CreateCollection(ctx, coll.Name()),
)

// start verifier
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)

// wait for generation 0 to end
verifierRunner.AwaitGenerationEnd()

const mvName = "Migration Verifier"

// Kill verifier’s change stream.
cursor, err := suite.srcMongoClient.Database(
"admin",
options.Database().SetReadConcern(readconcern.Local()),
).Aggregate(
ctx,
mongo.Pipeline{
{
{"$currentOp", bson.D{
{"idleCursors", true},
}},
},
{
{"$match", bson.D{
{"clientMetadata.application.name", mvName},
{"command.collection", "$cmd.aggregate"},
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
bson.D{{"$type", "object"}},
},
}},
},
},
)
suite.Require().NoError(err)

var ops []bson.Raw
suite.Require().NoError(cursor.All(ctx, &ops))

for _, cursorRaw := range ops {
opId, err := cursorRaw.LookupErr("opid")
suite.Require().NoError(err, "should get opid from op")

suite.T().Logf("Killing change stream op %+v", opId)

suite.Require().NoError(
suite.srcMongoClient.Database("admin").RunCommand(
ctx,
bson.D{
{"killOp", 1},
{"op", opId},
},
).Err(),
)
}

_, err = coll.InsertOne(
ctx,
bson.D{{"_id", "after kill"}},
)
suite.Require().NoError(err)

suite.Require().NoError(verifier.WritesOff(ctx))

suite.Require().NoError(verifierRunner.Await())

failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
ctx,
verifier.verificationTaskCollection(),
verificationTaskVerifyDocuments,
verifier.generation,
)
suite.Require().NoError(err)

suite.Assert().Zero(incompleteTasks, "no incomplete tasks")
suite.Require().Len(failedTasks, 1, "expect one failed task")
}

func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
suite.testInsertsBeforeWritesOff(10_000)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
select {
case err := <-verifier.changeStreamErrChan:
cancel()
return err
return errors.Wrap(err, "change stream failed")
case <-ctx.Done():
cancel()
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {

verifier.mux.Unlock()

// This has to happen under the lock because the change stream
// This has to happen outside the lock because the change stream
// might be inserting docs into the recheck queue, which happens
// under the lock.
verifier.changeStreamWritesOffTsChan <- finalTs
Expand Down
Loading

0 comments on commit 03853c7

Please sign in to comment.