Skip to content

Commit

Permalink
CBG-3392 match CAS checks for xattrs like CBS does (#18)
Browse files Browse the repository at this point in the history
This matches CBS behavior and allows all SG tests to pass.

I found the logic in `checkCASXattr` to be really confusing and I'm not
sure how to make it easier.

Formerly, a document was declared a tombstone based on whether it has
nil document body. However, having a nil document body is a valid state
for a document, so I added field to determine whether it is a tombstone.
I didn't write a schema migration to add this field since we have no
released version of SG that can use a persistent rosmar database.

I switched some bitflags to an options struct to make it easier to read
and more idiomatic in go, rather than add two new flags.

This passes all tests in Sync Gateway, including the ones that are
currently skipped.
  • Loading branch information
torcolvin authored Nov 21, 2023
1 parent adb4806 commit 9f20fac
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 40 deletions.
79 changes: 48 additions & 31 deletions collection+xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *Collection) SetXattr(
xv []byte,
) (casOut CAS, err error) {
traceEnter("SetXattr", "%q, %q", key, xattrKey)
casOut, err = c.writeWithXattr(key, nil, xattrKey, payload{marshaled: xv}, nil, nil, xNoOpts, nil)
casOut, err = c.writeWithXattr(key, nil, xattrKey, payload{marshaled: xv}, nil, nil, writeXattrOptions{}, nil)
traceExit("SetXattr", err, "0x%x", casOut)
return casOut, err
}
Expand All @@ -60,7 +60,7 @@ func (c *Collection) RemoveXattr(
cas CAS,
) error {
traceEnter("RemoveXattr", "%q, %q", key, xattrKey)
_, err := c.writeWithXattr(key, nil, xattrKey, payload{}, &cas, nil, xNoOpts, nil)
_, err := c.writeWithXattr(key, nil, xattrKey, payload{}, &cas, nil, writeXattrOptions{}, nil)
traceExit("RemoveXattr", err, "ok")
return err
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (c *Collection) WriteUserXattr(
xv interface{},
) (casOut CAS, err error) {
traceEnter("WriteUserXattr", "%q, %q, ...", key, xattrKey)
casOut, err = c.writeWithXattr(key, nil, xattrKey, payload{parsed: xv}, nil, nil, xNoOpts, nil)
casOut, err = c.writeWithXattr(key, nil, xattrKey, payload{parsed: xv}, nil, nil, writeXattrOptions{}, nil)
traceExit("WriteUserXattr", err, "0x%x", casOut)
return casOut, err
}
Expand All @@ -113,7 +113,7 @@ func (c *Collection) DeleteUserXattr(
xattrKey string,
) (casOut CAS, err error) {
traceEnter("DeleteUserXattr", "%q, %q", key, xattrKey)
casOut, err = c.writeWithXattr(key, nil, xattrKey, payload{}, nil, nil, xNoOpts, nil)
casOut, err = c.writeWithXattr(key, nil, xattrKey, payload{}, nil, nil, writeXattrOptions{}, nil)
traceExit("DeleteUserXattr", err, "0x%x", casOut)
return casOut, err
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Collection) WriteWithXattr(
traceEnter("WriteWithXattr", "%q, %q, cas=%d, exp=%d, isDelete=%v, deleteBody=%v ...", key, xattrKey, cas, exp, isDelete, deleteBody)
expP := ifelse(opts != nil && opts.PreserveExpiry, nil, &exp)
vp := ifelse(value != nil || deleteBody, &payload{marshaled: value}, nil)
casOut, err = c.writeWithXattr(key, vp, xattrKey, payload{marshaled: xv}, &cas, expP, xNoOpts, opts)
casOut, err = c.writeWithXattr(key, vp, xattrKey, payload{marshaled: xv}, &cas, expP, writeXattrOptions{isDelete: isDelete, deleteBody: deleteBody}, opts)
traceExit("WriteWithXattr", err, "0x%x", casOut)
return
}
Expand All @@ -185,7 +185,7 @@ func (c *Collection) WriteCasWithXattr(
if opts != nil && opts.PreserveExpiry {
expP = nil
}
casOut, err = c.writeWithXattr(key, vp, xattrKey, payload{parsed: xv}, &cas, expP, xNoOpts, opts)
casOut, err = c.writeWithXattr(key, vp, xattrKey, payload{parsed: xv}, &cas, expP, writeXattrOptions{}, opts)
traceExit("WriteCasWithXattr", err, "0x%x", casOut)
return
}
Expand Down Expand Up @@ -278,7 +278,7 @@ func (c *Collection) UpdateXattr(
) (casOut uint64, err error) {
traceEnter("UpdateXattr", "%q, %q, exp=%d, cas=0x%x ...", key, xattrKey, exp, cas)
defer func() { traceExit("UpdateXattr", err, "0x%x", casOut) }()
return c.writeWithXattr(key, nil, xattrKey, payload{parsed: xv}, &cas, &exp, xNoOpts, opts)
return c.writeWithXattr(key, nil, xattrKey, payload{parsed: xv}, &cas, &exp, writeXattrOptions{}, opts)
}

// Updates an xattr and deletes the body (making the doc a tombstone.)
Expand All @@ -293,7 +293,7 @@ func (c *Collection) UpdateXattrDeleteBody(
) (casOut uint64, err error) {
traceEnter("UpdateXattrDeleteBody", "%q, %q, exp=%d, cas=0x%x ...", key, xattrKey, exp, cas)
defer func() { traceExit("UpdateXattrDeleteBody", err, "0x%x", casOut) }()
return c.writeWithXattr(key, &payload{}, xattrKey, payload{parsed: xv}, &cas, &exp, xNoOpts, opts)
return c.writeWithXattr(key, &payload{}, xattrKey, payload{parsed: xv}, &cas, &exp, writeXattrOptions{}, opts)
}

// Deletes the document's body, and updates the CAS & CRC32 macros in the xattr.
Expand All @@ -307,7 +307,7 @@ func (c *Collection) DeleteBody(
) (casOut uint64, err error) {
traceEnter("DeleteBody", "%q, %q, exp=%d, cas=0x%x ...", key, xattrKey, exp, cas)
defer func() { traceExit("DeleteBody", err, "0x%x", casOut) }()
return c.writeWithXattr(key, &payload{}, xattrKey, payload{}, &cas, &exp, xPreserveXattr, opts)
return c.writeWithXattr(key, &payload{}, xattrKey, payload{}, &cas, &exp, writeXattrOptions{preserveXattr: true}, opts)
}

//////// INTERNALS:
Expand Down Expand Up @@ -383,14 +383,34 @@ func (c *Collection) _deleteBodyAndXattr(
}

// Option flags for writeWithXattr
type writeXattrOpts uint8
type writeXattrOptions struct {
insertDoc bool // Create new doc; fail if it already exists
insertXattr bool // Fail if `xattr` already exists
preserveXattr bool // Leave xattr value alone, just expand macros
isDelete bool // Allow ressurecting a tombstone
deleteBody bool // Delete the body along with updating tombstone
}

const (
xInsertDoc = writeXattrOpts(1 << iota) // Create new doc; fail if it already exists
xInsertXattr // Fail if `xattr` already exists
xPreserveXattr // Leave xattr value alone, just expand macros
xNoOpts = writeXattrOpts(0) // Default behavior
)
// checkCasXattr checks the cas supplied against the current cas of the document. e respresents the document in the bucket, and expectedCas is the expected value. Returns CasMismatchErr on an unsuccesful CAS check.
func checkCasXattr(hasPreviousDocBody bool, existingCas, expectedCas *CAS, isTombstone bool, opts writeXattrOptions) error {
// no cas supplied, nothing to check
if expectedCas == nil {
return nil
}
if !hasPreviousDocBody {
if opts.isDelete {
if opts.deleteBody {
return nil
}
} else if isTombstone {
return nil
}
}
if *existingCas == *expectedCas {
return nil
}
return sgbucket.CasMismatchErr{Expected: *expectedCas, Actual: *existingCas}
}

// Swiss Army knife method for modifying a document xattr, with or without changing the body.
func (c *Collection) writeWithXattr(
Expand All @@ -400,7 +420,7 @@ func (c *Collection) writeWithXattr(
xattrVal payload, // xattr value; a nil payload deletes the xattr
ifCas *CAS, // if non-nil, must match current CAS; 0 for insert
exp *Exp, // if non-nil, sets expiration to this value
opts writeXattrOpts, // option flags
opts writeXattrOptions, // option flags
mutateOpts *sgbucket.MutateInOptions, // expiry and macro expansion options
) (casOut CAS, err error) {
// Validate xattr key/value before going into the transaction:
Expand All @@ -419,27 +439,24 @@ func (c *Collection) writeWithXattr(
key: key,
cas: newCas,
}

var wasTombstone int
// First read the existing doc, if any:
row := txn.QueryRow(`SELECT value, isJSON, cas, exp, xattrs FROM documents WHERE collection=?1 AND key=?2`,
row := txn.QueryRow(`SELECT value, isJSON, cas, exp, xattrs, tombstone FROM documents WHERE collection=?1 AND key=?2`,
c.id, key)
var prevCas CAS
if err := scan(row, &e.value, &e.isJSON, &prevCas, &e.exp, &e.xattrs); err == nil {
if e.value == nil {
if err := scan(row, &e.value, &e.isJSON, &prevCas, &e.exp, &e.xattrs, &wasTombstone); err == nil {
if wasTombstone == 1 {
e.xattrs = nil // xattrs are cleared whenever resurrecting a tombstone
} else if opts&xInsertDoc != 0 {
} else if opts.insertDoc {
return nil, sgbucket.ErrKeyExists
}
} else if err != sql.ErrNoRows {
return nil, remapKeyError(err, key)
}

if ifCas != nil && *ifCas != prevCas {
if e.value == nil && *ifCas == 0 {
// It's OK to use CAS 0 to refer to a tombstone.
} else {
return nil, sgbucket.CasMismatchErr{Expected: *ifCas, Actual: prevCas}
}
err := checkCasXattr(e.value != nil, &prevCas, ifCas, wasTombstone == 1, opts)
if err != nil {
return nil, err
}

var xattrs semiParsedXattrs
Expand All @@ -466,9 +483,9 @@ func (c *Collection) writeWithXattr(
}
}

if (opts & xPreserveXattr) != 0 {
if opts.preserveXattr {
// The xPreserveXattr flag means to keep the xattr's value (but do macro expansion.)
if opts&xInsertXattr != 0 {
if opts.insertXattr {
return nil, fmt.Errorf("illegal options to rosmar.Collection.writeWithXattr")
}
existingVal, ok := xattrs[xattrKey]
Expand All @@ -483,7 +500,7 @@ func (c *Collection) writeWithXattr(

if !xattrVal.isNil() {
// Set xattr:
if (opts&xInsertXattr != 0) && xattrs[xattrKey] != nil {
if opts.insertXattr && xattrs[xattrKey] != nil {
return nil, sgbucket.ErrPathExists
}
// Expand any macros specified in the mutateOpts
Expand Down
27 changes: 19 additions & 8 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (c *Collection) GetRaw(key string) (val []byte, cas CAS, err error) {
return
}

// isTombstone returns true if the document is a tombstone.
func (c *Collection) isTombstone(q queryable, key string) bool {
row := q.QueryRow("SELECT 1 FROM documents WHERE collection=? AND key=? AND tombstone=1", c.id, key)
var i int
err := scan(row, &i)
return err == nil
}

func (c *Collection) getRaw(q queryable, key string) (val []byte, cas CAS, err error) {
row := q.QueryRow("SELECT value, cas FROM documents WHERE collection=? AND key=?", c.id, key)
if err = scan(row, &val, &cas); err != nil {
Expand Down Expand Up @@ -144,7 +152,7 @@ func (c *Collection) add(key string, exp Exp, val []byte, isJSON bool) (added bo
`INSERT INTO documents (collection,key,value,cas,exp,isJSON) VALUES (?1,?2,?3,?4,?5,?6)
ON CONFLICT(collection,key) DO
UPDATE SET value=?3, xattrs=null, cas=?4, exp=?5, isJSON=?6
WHERE value IS NULL`,
WHERE tombstone != 0`,
c.id, key, val, newCas, exp, isJSON)
if err != nil {
return
Expand Down Expand Up @@ -299,26 +307,30 @@ func (c *Collection) WriteCas(key string, flags int, exp Exp, cas CAS, val any,
}

err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (*event, error) {
wasTombstone := false
if cas != 0 {
wasTombstone = c.isTombstone(txn, key)
}
exp = absoluteExpiry(exp)
var sql string
if (opt & sgbucket.Append) != 0 {
// Append:
sql = `UPDATE documents SET value=value || ?1, cas=?2, exp=?6, isJSON=0,
xattrs=iif(value is null, null, xattrs)
xattrs=iif(tombstone != 0, null, xattrs)
WHERE collection=?3 AND key=?4 AND cas=?5`
} else if (opt&sgbucket.AddOnly) != 0 || cas == 0 {
// Insert, but fall back to Update if the doc is a tombstone
sql = `INSERT INTO documents (collection, key, value, cas, exp, isJSON) VALUES(?3,?4,?1,?2,?6,?7)
ON CONFLICT(collection,key) DO
UPDATE SET value=?1, xattrs=null, cas=?2, exp=?6, isJSON=?7
WHERE value IS NULL`
if cas != 0 {
UPDATE SET value=?1, xattrs=null, cas=?2, exp=?6, isJSON=?7, tombstone=0
WHERE tombstone == 1`
if !wasTombstone && cas != 0 {
sql += ` AND cas=?5`
}
} else {
// Regular write:
sql = `UPDATE documents SET value=?1, cas=?2, exp=?6, isJSON=?7,
xattrs=iif(value is null, null, xattrs)
xattrs=iif(tombstone != 0, null, xattrs)
WHERE collection=?3 AND key=?4 AND cas=?5`
}
result, err := txn.Exec(sql, raw, newCas, c.id, key, cas, exp, isJSON)
Expand Down Expand Up @@ -400,10 +412,9 @@ func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) {
rawXattrs = nil
}
}

// Now update, setting value=null, isJSON=false, and updating the xattrs:
_, err = txn.Exec(
`UPDATE documents SET value=null, cas=?1, exp=0, isJSON=0, xattrs=?2
`UPDATE documents SET value=null, cas=?1, exp=0, isJSON=0, xattrs=?2, tombstone=1
WHERE collection=?3 AND key=?4`,
newCas, rawXattrs, c.id, key)
if err == nil {
Expand Down
Loading

0 comments on commit 9f20fac

Please sign in to comment.