Skip to content

Commit

Permalink
Improve Version Vector Handling for Legacy SDK and Snapshots (#1096)
Browse files Browse the repository at this point in the history
This commit addressed critical version vector management issues across
multiple scenarios in document creation and editing. It implemented fixes for
legacy SDK changes, server-generated snapshots, and lamport timestamp
initialization to ensure accurate concurrent editing and version tracking.

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
chacha912 and hackerwins authored Dec 10, 2024
1 parent 608c7e4 commit e3045dc
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 7 deletions.
20 changes: 19 additions & 1 deletion pkg/document/change/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,16 @@ func (id ID) SyncClocks(other ID) ID {
lamport = other.lamport + 1
}

newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(other.versionVector))
// NOTE(chacha912): For changes created by legacy SDK prior to v0.5.2 that lack version
// vectors, document's version vector was not being properly accumlated. To address this,
// we generate a version vector using the lamport timestamp when no version vector exists.
otherVV := other.versionVector
if len(otherVV) == 0 {
otherVV = otherVV.DeepCopy()
otherVV.Set(other.actorID, other.lamport)
}

newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(otherVV))
newID.versionVector.Set(id.actorID, lamport)
return newID
}
Expand All @@ -117,6 +126,15 @@ func (id ID) SetClocks(otherLamport int64, vector time.VersionVector) ID {
lamport = otherLamport + 1
}

// NOTE(chacha912): Documents created by server may have an InitialActorID
// in their version vector. Although server is not an actual client, it
// generates document snapshots from changes by participating with an
// InitialActorID during document instance creation and accumulating stored
// changes in DB.
// Semantically, including a non-client actor in version vector is
// problematic. To address this, we remove the InitialActorID from snapshots.
vector.Unset(time.InitialActorID)

newID := NewID(id.clientSeq, id.serverSeq, lamport, id.actorID, id.versionVector.Max(vector))
newID.versionVector.Set(id.actorID, lamport)

Expand Down
2 changes: 1 addition & 1 deletion pkg/document/crdt/rga_tree_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (s *RGATreeSplit[V]) deleteNodes(
if versionVector == nil && maxCreatedAtMapByActor == nil {
// Local edit - use version vector comparison
clientLamportAtChange = time.MaxLamport
} else if versionVector != nil {
} else if len(versionVector) > 0 {
lamport, ok := versionVector.Get(actorID)
if ok {
clientLamportAtChange = lamport
Expand Down
2 changes: 1 addition & 1 deletion pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
if hasSnapshot {
d.cloneRoot = nil
d.clonePresences = nil
if err := d.doc.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
if err := d.doc.applySnapshot(pack.Snapshot, pack.VersionVector); err != nil {
return err
}
} else {
Expand Down
14 changes: 10 additions & 4 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) er

// 01. Apply remote changes to both the cloneRoot and the document.
if hasSnapshot {
if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
if err := d.applySnapshot(pack.Snapshot, pack.VersionVector); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -260,7 +260,7 @@ func (d *InternalDocument) RootObject() *crdt.Object {
return d.root.Object()
}

func (d *InternalDocument) applySnapshot(snapshot []byte, serverSeq int64, vector time.VersionVector) error {
func (d *InternalDocument) applySnapshot(snapshot []byte, vector time.VersionVector) error {
rootObj, presences, err := converter.BytesToSnapshot(snapshot)
if err != nil {
return err
Expand All @@ -269,8 +269,14 @@ func (d *InternalDocument) applySnapshot(snapshot []byte, serverSeq int64, vecto
d.root = crdt.NewRoot(rootObj)
d.presences = presences

// TODO(hackerwins): We need to check we can use serverSeq as lamport timestamp.
d.changeID = d.changeID.SetClocks(serverSeq, vector)
// NOTE(chacha912): Documents created from snapshots were experiencing edit
// restrictions due to low lamport values.
// Previously, the code attempted to generate document lamport from ServerSeq.
// However, after aligning lamport logic with the original research paper,
// ServerSeq could potentially become smaller than the lamport value.
// To resolve this, we initialize document's lamport by using the highest
// lamport value stored in version vector as the starting point.
d.changeID = d.changeID.SetClocks(vector.MaxLamport(), vector)

return nil
}
Expand Down
93 changes: 93 additions & 0 deletions test/integration/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,4 +1447,97 @@ func TestGarbageCollection(t *testing.T) {
assert.Equal(t, `{"text":[{"val":"x"},{"val":"a"}]}`, d2.Marshal())
assert.Equal(t, `{"text":[{"val":"x"},{"val":"a"}]}`, d1.Marshal())
})

t.Run("snapshot version vector test", func(t *testing.T) {
clients := activeClients(t, 3)
c1, c2, c3 := clients[0], clients[1], clients[2]
defer deactivateAndCloseClients(t, clients)

ctx := context.Background()

d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
assert.NoError(t, err)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewText("text").Edit(0, 0, "a")
return nil
})
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
assert.NoError(t, err)

d3 := document.New(helper.TestDocKey(t))
err = c3.Attach(ctx, d3)
assert.NoError(t, err)

err = c1.Sync(ctx)
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)
assert.Equal(t, `{"text":[{"val":"a"}]}`, d1.Marshal())
assert.Equal(t, `{"text":[{"val":"a"}]}`, d2.Marshal())
assert.Equal(t, `{"text":[{"val":"a"}]}`, d3.Marshal())
assert.Equal(t, true, checkVV(d1.VersionVector(), versionOf(d1.ActorID(), 4), versionOf(d2.ActorID(), 1), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d2.VersionVector(), versionOf(d1.ActorID(), 2), versionOf(d2.ActorID(), 4), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d3.VersionVector(), versionOf(d1.ActorID(), 2), versionOf(d2.ActorID(), 1), versionOf(d3.ActorID(), 4)))

// 01. Update changes over snapshot threshold.
for i := 0; i <= int(helper.SnapshotThreshold)/2; i++ {
err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(0, 0, strconv.Itoa(i))
return nil
})
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)

err = d2.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(0, 0, strconv.Itoa(i))
return nil
})
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)
}
assert.Equal(t, true, checkVV(d1.VersionVector(), versionOf(d1.ActorID(), 28), versionOf(d2.ActorID(), 27), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d2.VersionVector(), versionOf(d1.ActorID(), 25), versionOf(d2.ActorID(), 27), versionOf(d3.ActorID(), 1)))
assert.Equal(t, true, checkVV(d3.VersionVector(), versionOf(d1.ActorID(), 2), versionOf(d2.ActorID(), 1), versionOf(d3.ActorID(), 4)))

// 02. Makes local changes then pull a snapshot from the server.
err = d3.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(0, 0, "c")
return nil
})
assert.NoError(t, err)
err = c3.Sync(ctx)
assert.NoError(t, err)
assert.Equal(t, true, checkVV(d3.VersionVector(), versionOf(d1.ActorID(), 25), versionOf(d2.ActorID(), 27), versionOf(d3.ActorID(), 30)))
assert.Equal(t, `{"text":[{"val":"5"},{"val":"5"},{"val":"4"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d3.Marshal())

// 03. Delete text after receiving the snapshot.
err = d3.Update(func(root *json.Object, p *presence.Presence) error {
root.GetText("text").Edit(1, 3, "")
return nil
})
assert.NoError(t, err)
assert.Equal(t, `{"text":[{"val":"5"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d3.Marshal())

err = c3.Sync(ctx)
assert.NoError(t, err)
err = c2.Sync(ctx)
assert.NoError(t, err)
err = c1.Sync(ctx)
assert.NoError(t, err)
assert.Equal(t, `{"text":[{"val":"5"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d2.Marshal())
assert.Equal(t, `{"text":[{"val":"5"},{"val":"4"},{"val":"3"},{"val":"3"},{"val":"2"},{"val":"2"},{"val":"1"},{"val":"1"},{"val":"0"},{"val":"c"},{"val":"0"},{"val":"a"}]}`, d1.Marshal())
})
}

0 comments on commit e3045dc

Please sign in to comment.