From d78f6ffaa23cae91ae0247cf2961873de3e8a80a Mon Sep 17 00:00:00 2001 From: Sejong Kim Date: Fri, 6 Sep 2024 22:39:09 +0900 Subject: [PATCH] Add PushPull consistency test --- server/packs/packs.go | 11 ++ server/packs/packs_test.go | 232 +++++++++++++++++++++++++++++ server/rpc/yorkie_server.go | 8 +- test/bench/push_pull_bench_test.go | 12 +- 4 files changed, 253 insertions(+), 10 deletions(-) create mode 100644 server/packs/packs_test.go diff --git a/server/packs/packs.go b/server/packs/packs.go index ce76a3f9c..cbea66007 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -20,6 +20,7 @@ package packs import ( "context" + "errors" "fmt" "strconv" gotime "time" @@ -37,6 +38,10 @@ import ( "github.com/yorkie-team/yorkie/server/logging" ) +var ( + ErrCheckpointTest = errors.New("failure for checkpoint testing purpose") +) + // PushPullKey creates a new sync.Key of PushPull for the given document. func PushPullKey(projectID types.ID, docKey key.Key) sync.Key { return sync.NewKey(fmt.Sprintf("pushpull-%s-%s", projectID, docKey)) @@ -69,6 +74,7 @@ func PushPull( docInfo *database.DocInfo, reqPack *change.Pack, opts PushPullOptions, + cpTest bool, ) (*ServerPack, error) { start := gotime.Now() defer func() { @@ -123,6 +129,11 @@ func PushPull( } } + // For consistency testing purposes + if cpTest { + return nil, ErrCheckpointTest + } + if err := be.DB.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo); err != nil { return nil, err } diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go new file mode 100644 index 000000000..75403af7c --- /dev/null +++ b/server/packs/packs_test.go @@ -0,0 +1,232 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package packs_test + +import ( + "context" + "encoding/hex" + "fmt" + "log" + "net/http" + "os" + "testing" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + + "github.com/yorkie-team/yorkie/api/converter" + "github.com/yorkie-team/yorkie/api/types" + api "github.com/yorkie-team/yorkie/api/yorkie/v1" + "github.com/yorkie-team/yorkie/api/yorkie/v1/v1connect" + "github.com/yorkie-team/yorkie/client" + "github.com/yorkie-team/yorkie/pkg/document" + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/server/backend" + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/backend/database/mongo" + "github.com/yorkie-team/yorkie/server/backend/housekeeping" + "github.com/yorkie-team/yorkie/server/clients" + "github.com/yorkie-team/yorkie/server/documents" + "github.com/yorkie-team/yorkie/server/packs" + "github.com/yorkie-team/yorkie/server/profiling/prometheus" + "github.com/yorkie-team/yorkie/server/rpc" + "github.com/yorkie-team/yorkie/test/helper" +) + +var ( + testRPCServer *rpc.Server + testRPCAddr = fmt.Sprintf("localhost:%d", helper.RPCPort) + testClient v1connect.YorkieServiceClient + testBackend *backend.Backend +) + +func TestMain(m *testing.M) { + met, err := prometheus.NewMetrics() + if err != nil { + log.Fatal(err) + } + + testBackend, err = backend.New(&backend.Config{ + AdminUser: helper.AdminUser, + AdminPassword: helper.AdminPassword, + UseDefaultProject: helper.UseDefaultProject, + ClientDeactivateThreshold: helper.ClientDeactivateThreshold, + SnapshotThreshold: helper.SnapshotThreshold, + AuthWebhookCacheSize: helper.AuthWebhookSize, + ProjectInfoCacheSize: helper.ProjectInfoCacheSize, + ProjectInfoCacheTTL: helper.ProjectInfoCacheTTL.String(), + AdminTokenDuration: helper.AdminTokenDuration, + }, &mongo.Config{ + ConnectionURI: helper.MongoConnectionURI, + YorkieDatabase: helper.TestDBName(), + ConnectionTimeout: helper.MongoConnectionTimeout, + PingTimeout: helper.MongoPingTimeout, + }, &housekeeping.Config{ + Interval: helper.HousekeepingInterval.String(), + CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + ProjectFetchSize: helper.HousekeepingProjectFetchSize, + }, met) + if err != nil { + log.Fatal(err) + } + + project, err := testBackend.DB.FindProjectInfoByID( + context.Background(), + database.DefaultProjectID, + ) + if err != nil { + log.Fatal(err) + } + + testRPCServer, err = rpc.NewServer(&rpc.Config{ + Port: helper.RPCPort, + }, testBackend) + if err != nil { + log.Fatal(err) + } + + if err = testRPCServer.Start(); err != nil { + log.Fatalf("failed rpc listen: %s\n", err) + } + if err = helper.WaitForServerToStart(testRPCAddr); err != nil { + log.Fatal(err) + } + + authInterceptor := client.NewAuthInterceptor(project.PublicKey, "") + + conn := http.DefaultClient + testClient = v1connect.NewYorkieServiceClient( + conn, + "http://"+testRPCAddr, + connect.WithInterceptors(authInterceptor), + ) + + code := m.Run() + + if err := testBackend.Shutdown(); err != nil { + log.Fatal(err) + } + testRPCServer.Shutdown(true) + os.Exit(code) +} + +func TestPacks(t *testing.T) { + t.Run("pushpull consistency test", func(t *testing.T) { + ctx := context.Background() + + projectInfo, err := testBackend.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + assert.NoError(t, err) + project := projectInfo.ToProject() + + activateResp, err := testClient.ActivateClient( + context.Background(), + connect.NewRequest(&api.ActivateClientRequest{ClientKey: helper.TestDocKey(t).String()})) + assert.NoError(t, err) + + clientID, _ := hex.DecodeString(activateResp.Msg.ClientId) + resPack, err := testClient.AttachDocument( + context.Background(), + connect.NewRequest(&api.AttachDocumentRequest{ + ClientId: activateResp.Msg.ClientId, + ChangePack: &api.ChangePack{ + DocumentKey: helper.TestDocKey(t).String(), + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 1}, + Changes: []*api.Change{{ + Id: &api.ChangeID{ + ClientSeq: 1, + Lamport: 1, + ActorId: clientID, + }, + }}, + }, + }, + )) + assert.NoError(t, err) + + actorID, err := time.ActorIDFromBytes(clientID) + assert.NoError(t, err) + + docID := types.ID(resPack.Msg.DocumentId) + docRefKey := types.DocRefKey{ + ProjectID: project.ID, + DocID: docID, + } + + // 0. Check docInfo.ServerSeq and clientInfo.Checkpoint + docInfo, err := documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey) + assert.NoError(t, err) + assert.Equal(t, int64(1), docInfo.ServerSeq) + + clientInfo, err := clients.FindActiveClientInfo(ctx, testBackend.DB, types.ClientRefKey{ + ProjectID: project.ID, + ClientID: types.IDFromActorID(actorID), + }) + assert.NoError(t, err) + assert.Equal(t, int64(1), clientInfo.Checkpoint(docID).ServerSeq) + assert.Equal(t, uint32(1), clientInfo.Checkpoint(docID).ClientSeq) + + // 1. Create a ChangePack with a single Change + pack, err := converter.FromChangePack(&api.ChangePack{ + DocumentKey: helper.TestDocKey(t).String(), + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 2}, + Changes: []*api.Change{{ + Id: &api.ChangeID{ + ClientSeq: 2, + Lamport: 2, + ActorId: clientID, + }, + }}, + }) + assert.NoError(t, err) + + // 2-1. An arbitrary failure occurs while updating clientInfo + _, err = packs.PushPull(ctx, testBackend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }, true) + assert.ErrorIs(t, err, packs.ErrCheckpointTest) + + // 2-2. docInfo.ServerSeq increases from 1 to 2 + docInfo, err = documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey) + assert.NoError(t, err) + assert.Equal(t, int64(2), docInfo.ServerSeq) + + // 2-3. clientInfo.Checkpoint has not been updated + clientInfo, err = clients.FindActiveClientInfo(ctx, testBackend.DB, types.ClientRefKey{ + ProjectID: project.ID, + ClientID: types.IDFromActorID(actorID), + }) + assert.NoError(t, err) + assert.Equal(t, int64(1), clientInfo.Checkpoint(docID).ServerSeq) + assert.Equal(t, uint32(1), clientInfo.Checkpoint(docID).ClientSeq) + + // 3-1. A duplicate request is sent + _, err = packs.PushPull(ctx, testBackend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }, false) + assert.NoError(t, err) + + // 3-2. The server should detect the duplication and not update docInfo.ServerSeq + docInfo, err = documents.FindDocInfoByRefKey(ctx, testBackend, docRefKey) + assert.NoError(t, err) + assert.Equal(t, int64(2), docInfo.ServerSeq) + }) +} diff --git a/server/rpc/yorkie_server.go b/server/rpc/yorkie_server.go index 86c273080..e9a19ed13 100644 --- a/server/rpc/yorkie_server.go +++ b/server/rpc/yorkie_server.go @@ -164,7 +164,7 @@ func (s *yorkieServer) AttachDocument( pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, - }) + }, false) if err != nil { return nil, err } @@ -258,7 +258,7 @@ func (s *yorkieServer) DetachDocument( pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: status, - }) + }, false) if err != nil { return nil, err } @@ -349,7 +349,7 @@ func (s *yorkieServer) PushPullChanges( pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ Mode: syncMode, Status: document.StatusAttached, - }) + }, false) if err != nil { return nil, err } @@ -546,7 +546,7 @@ func (s *yorkieServer) RemoveDocument( pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusRemoved, - }) + }, false) if err != nil { return nil, err } diff --git a/test/bench/push_pull_bench_test.go b/test/bench/push_pull_bench_test.go index ca6fff1bb..be5519ad9 100644 --- a/test/bench/push_pull_bench_test.go +++ b/test/bench/push_pull_bench_test.go @@ -147,7 +147,7 @@ func benchmarkPushChanges( _, err = packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, - }) + }, false) assert.NoError(b, err) } } @@ -177,7 +177,7 @@ func benchmarkPullChanges( _, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, - }) + }, false) assert.NoError(b, err) docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey) @@ -187,7 +187,7 @@ func benchmarkPullChanges( _, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, - }) + }, false) assert.NoError(b, err) } } @@ -220,7 +220,7 @@ func benchmarkPushSnapshots( pulled, err := packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pushPack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, - }) + }, false) assert.NoError(b, err) b.StopTimer() @@ -259,7 +259,7 @@ func benchmarkPullSnapshot( _, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, - }) + }, false) assert.NoError(b, err) docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey) @@ -269,7 +269,7 @@ func benchmarkPullSnapshot( _, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{ Mode: types.SyncModePushPull, Status: document.StatusAttached, - }) + }, false) assert.NoError(b, err) } }