Skip to content

Commit

Permalink
Fix difference in KV CAS operations for R1 vs R3 (#5841)
Browse files Browse the repository at this point in the history
A regression was introduced in
#5821 where CAS operations on
a R1 stream would succeed even if they should be rejected. Whereas on
R3/clustered they would be rejected.

Resolves: #5840

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Waldemar Quevedo <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Co-authored-by: Waldemar Quevedo <[email protected]>
  • Loading branch information
MauriceVanVeen and wallyqs committed Aug 28, 2024
1 parent e7c88e8 commit 3ffe823
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
39 changes: 19 additions & 20 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,27 +2913,26 @@ func TestJetStreamClusterKeyValueLastSeqMismatch(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "mismatch",
Replicas: 3,
})
require_NoError(t, err)

revision, err := kv.Create("foo", []byte("1"))
require_NoError(t, err)
require_Equal(t, revision, 1)
for _, r := range []int{1, 3} {
t.Run(fmt.Sprintf("R=%d", r), func(t *testing.T) {
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: fmt.Sprintf("mismatch_%v", r),
Replicas: r,
})
require_NoError(t, err)

revision, err = kv.Create("bar", []byte("2"))
require_NoError(t, err)
require_Equal(t, revision, 2)
revision, err := kv.Create("foo", []byte("1"))
require_NoError(t, err)
require_Equal(t, revision, 1)

// Now delete foo from sequence 1.
// This needs to be low level remove (or system level) to test the condition we want here.
err = js.DeleteMsg("KV_mismatch", 1)
require_Error(t, err)
revision, err = kv.Create("bar", []byte("2"))
require_NoError(t, err)
require_Equal(t, revision, 2)

// Now say we want to update baz but iff last was revision 1.
_, err = kv.Update("baz", []byte("3"), uint64(1))
require_Error(t, err)
require_Equal(t, err.Error(), `nats: wrong last sequence: 0`)
// Now say we want to update baz but iff last was revision 1.
_, err = kv.Update("baz", []byte("3"), uint64(1))
require_Error(t, err)
require_Equal(t, err.Error(), `nats: wrong last sequence: 0`)
})
}
}
2 changes: 1 addition & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4400,7 +4400,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if err == ErrStoreMsgNotFound {
if seq == 0 {
fseq, err = 0, nil
} else {
} else if mset.isClustered() {
// Do not bump clfs in case message was not found and could have been deleted.
var ss StreamState
store.FastState(&ss)
Expand Down

0 comments on commit 3ffe823

Please sign in to comment.