Skip to content

Commit

Permalink
CBG-3764 create multi-xattr subdoc APIs (#6739)
Browse files Browse the repository at this point in the history
Co-authored-by: adamcfraser <[email protected]>
  • Loading branch information
torcolvin and adamcfraser authored Mar 27, 2024
1 parent ab3df1d commit 8b976f5
Show file tree
Hide file tree
Showing 40 changed files with 1,023 additions and 760 deletions.
4 changes: 2 additions & 2 deletions auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (auth *Authenticator) Save(p Principal) error {
return err
}

casOut, writeErr := auth.datastore.WriteCas(p.DocID(), 0, 0, p.Cas(), p, 0)
casOut, writeErr := auth.datastore.WriteCas(p.DocID(), 0, p.Cas(), p, 0)
if writeErr != nil {
return writeErr
}
Expand All @@ -512,7 +512,7 @@ func (auth *Authenticator) Save(p Principal) error {
// Used for resync
func (auth *Authenticator) UpdateSequenceNumber(p Principal, seq uint64) error {
p.SetSequence(seq)
casOut, writeErr := auth.datastore.WriteCas(p.DocID(), 0, 0, p.Cas(), p, 0)
casOut, writeErr := auth.datastore.WriteCas(p.DocID(), 0, p.Cas(), p, 0)
if writeErr != nil {
return writeErr
}
Expand Down
562 changes: 345 additions & 217 deletions base/bucket_gocb_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
return false
}
return len(agent.N1qlEps()) > 0
// added in Couchbase Server 6.6
case sgbucket.BucketStoreFeatureCreateDeletedWithXattr:
status, err := b.bucket.Internal().CapabilityStatus(gocb.CapabilityCreateAsDeleted)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion base/collection_gocb.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *Collection) SetRaw(k string, exp uint32, opts *sgbucket.UpsertOptions,
return err
}

func (c *Collection) WriteCas(k string, flags int, exp uint32, cas uint64, v interface{}, opt sgbucket.WriteOptions) (casOut uint64, err error) {
func (c *Collection) WriteCas(k string, exp uint32, cas uint64, v interface{}, opt sgbucket.WriteOptions) (casOut uint64, err error) {
c.Bucket.waitForAvailKvOp()
defer c.Bucket.releaseKvOp()

Expand Down
353 changes: 195 additions & 158 deletions base/collection_xattr.go

Large diffs are not rendered by default.

140 changes: 63 additions & 77 deletions base/collection_xattr_common.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ var (
ErrPathNotFound = sgbucket.ErrPathNotFound
ErrPathExists = sgbucket.ErrPathExists

// ErrXattrNotFound is returned if a requested xattr is not present on a DCP event
// ErrXattrNotFound is returned if all requested xattrs are not present
ErrXattrNotFound = &sgError{"Xattr Not Found"}

// ErrXattrPartialFound is returned if only a subset of requested xattrs are found
ErrXattrPartialFound = &sgError{"Some Requested Xattrs Not Found"}

// ErrXattrInvalidLen is returned if the xattr is corrupt.
ErrXattrInvalidLen = &sgError{"Xattr stream length"}

Expand Down
2 changes: 1 addition & 1 deletion base/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (dh *documentBackedListener) updateNodeList(ctx context.Context, nodeID str

InfofCtx(ctx, KeyCluster, "Updating nodeList document (%s) with node IDs: %v", dh.nodeListKey, dh.nodeIDs)

casOut, err := dh.datastore.WriteCas(dh.nodeListKey, 0, 0, dh.cas, dh.nodeIDs, 0)
casOut, err := dh.datastore.WriteCas(dh.nodeListKey, 0, dh.cas, dh.nodeIDs, 0)

if err == nil { // Successful update
dh.cas = casOut
Expand Down
82 changes: 27 additions & 55 deletions base/leaky_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (lds *LeakyDataStore) GetRaw(k string) (v []byte, cas uint64, err error) {
}
return lds.dataStore.GetRaw(k)
}
func (lds *LeakyDataStore) GetWithXattr(ctx context.Context, k string, xattr string, userXattrKey string, rv interface{}, xv interface{}, uxv interface{}) (cas uint64, err error) {
func (lds *LeakyDataStore) GetWithXattrs(ctx context.Context, k string, xattrKeys []string) (body []byte, xattrs map[string][]byte, cas uint64, err error) {
if lds.config.GetWithXattrCallback != nil {
if err := lds.config.GetWithXattrCallback(k); err != nil {
return 0, err
return nil, nil, 0, err
}
}
return lds.dataStore.GetWithXattr(ctx, k, xattr, userXattrKey, rv, xv, uxv)
return lds.dataStore.GetWithXattrs(ctx, k, xattrKeys)
}

func (lds *LeakyDataStore) GetAndTouchRaw(k string, exp uint32) (v []byte, cas uint64, err error) {
Expand Down Expand Up @@ -134,8 +134,8 @@ func (lds *LeakyDataStore) Delete(k string) error {
func (lds *LeakyDataStore) Remove(k string, cas uint64) (casOut uint64, err error) {
return lds.dataStore.Remove(k, cas)
}
func (lds *LeakyDataStore) WriteCas(k string, flags int, exp uint32, cas uint64, v interface{}, opt sgbucket.WriteOptions) (uint64, error) {
return lds.dataStore.WriteCas(k, flags, exp, cas, v, opt)
func (lds *LeakyDataStore) WriteCas(k string, exp uint32, cas uint64, v interface{}, opt sgbucket.WriteOptions) (uint64, error) {
return lds.dataStore.WriteCas(k, exp, cas, v, opt)
}
func (lds *LeakyDataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (casOut uint64, err error) {
if lds.config.UpdateCallback != nil {
Expand Down Expand Up @@ -244,64 +244,52 @@ func (lds *LeakyDataStore) GetMaxVbno() (uint16, error) {
return lds.bucket.GetMaxVbno()
}

func (lds *LeakyDataStore) WriteCasWithXattr(ctx context.Context, k string, xattr string, exp uint32, cas uint64, v interface{}, xv interface{}, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return lds.dataStore.WriteCasWithXattr(ctx, k, xattr, exp, cas, v, xv, opts)
}

func (lds *LeakyDataStore) WriteWithXattr(ctx context.Context, k string, xattrKey string, exp uint32, cas uint64, value []byte, xattrValue []byte, isDelete bool, deleteBody bool, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
func (lds *LeakyDataStore) WriteWithXattrs(ctx context.Context, k string, exp uint32, cas uint64, value []byte, xattrs map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
if lds.config.WriteWithXattrCallback != nil {
lds.config.WriteWithXattrCallback(k)
}
return lds.dataStore.WriteWithXattr(ctx, k, xattrKey, exp, cas, value, xattrValue, isDelete, deleteBody, opts)
return lds.dataStore.WriteWithXattrs(ctx, k, exp, cas, value, xattrs, opts)
}

func (lds *LeakyDataStore) WriteUpdateWithXattr(ctx context.Context, k string, xattr string, userXattrKey string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrFunc) (casOut uint64, err error) {
func (lds *LeakyDataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (casOut uint64, err error) {
if lds.config.UpdateCallback != nil {
wrapperCallback := func(current []byte, xattr []byte, userXattr []byte, cas uint64) (updated []byte, updatedXattr []byte, deletedDoc bool, expiry *uint32, updatedSpec []sgbucket.MacroExpansionSpec, err error) {
updated, updatedXattr, deletedDoc, expiry, updatedSpec, err = callback(current, xattr, userXattr, cas)
wrapperCallback := func(current []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
updatedDoc, err := callback(current, xattrs, cas)
lds.config.UpdateCallback(k)
return updated, updatedXattr, deletedDoc, expiry, updatedSpec, err
return updatedDoc, err
}
return lds.dataStore.WriteUpdateWithXattr(ctx, k, xattr, userXattrKey, exp, previous, opts, wrapperCallback)
return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback)
}
return lds.dataStore.WriteUpdateWithXattr(ctx, k, xattr, userXattrKey, exp, previous, opts, callback)
return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback)
}

func (lds *LeakyDataStore) SetXattr(ctx context.Context, k string, xattrKey string, xv []byte) (casOut uint64, err error) {
func (lds *LeakyDataStore) SetXattrs(ctx context.Context, k string, xv map[string][]byte) (casOut uint64, err error) {
if lds.config.SetXattrCallback != nil {
if err := lds.config.SetXattrCallback(k); err != nil {
return 0, err
}
}
return lds.dataStore.SetXattr(ctx, k, xattrKey, xv)
return lds.dataStore.SetXattrs(ctx, k, xv)
}

func (lds *LeakyDataStore) RemoveXattr(ctx context.Context, k string, xattrKey string, cas uint64) (err error) {
return lds.dataStore.RemoveXattr(ctx, k, xattrKey, cas)
func (lds *LeakyDataStore) RemoveXattrs(ctx context.Context, k string, xattrKeys []string, cas uint64) (err error) {
return lds.dataStore.RemoveXattrs(ctx, k, xattrKeys, cas)
}

func (lds *LeakyDataStore) DeleteXattrs(ctx context.Context, k string, xattrKeys ...string) (err error) {
return lds.dataStore.DeleteXattrs(ctx, k, xattrKeys...)
func (lds *LeakyDataStore) DeleteSubDocPaths(ctx context.Context, k string, xattrKeys ...string) (err error) {
return lds.dataStore.DeleteSubDocPaths(ctx, k, xattrKeys...)
}

func (lds *LeakyDataStore) SubdocInsert(ctx context.Context, docID string, fieldPath string, cas uint64, value interface{}) error {
return lds.dataStore.SubdocInsert(ctx, docID, fieldPath, cas, value)
}

func (lds *LeakyDataStore) DeleteWithXattr(ctx context.Context, k string, xattr string) error {
return lds.dataStore.DeleteWithXattr(ctx, k, xattr)
}

func (lds *LeakyDataStore) WriteUserXattr(docKey string, xattrKey string, xattrVal interface{}) (uint64, error) {
return lds.dataStore.WriteUserXattr(docKey, xattrKey, xattrVal)
}

func (lds *LeakyDataStore) DeleteUserXattr(docKey string, xattrKey string) (uint64, error) {
return lds.dataStore.DeleteUserXattr(docKey, xattrKey)
func (lds *LeakyDataStore) DeleteWithXattrs(ctx context.Context, k string, xattrKeys []string) error {
return lds.dataStore.DeleteWithXattrs(ctx, k, xattrKeys)
}

func (lds *LeakyDataStore) GetXattr(ctx context.Context, k string, xattr string, xv interface{}) (cas uint64, err error) {
return lds.dataStore.GetXattr(ctx, k, xattr, xv)
func (lds *LeakyDataStore) GetXattrs(ctx context.Context, k string, xattrKeys []string) (xattrs map[string][]byte, cas uint64, err error) {
return lds.dataStore.GetXattrs(ctx, k, xattrKeys)
}

func (lds *LeakyDataStore) GetSubDocRaw(ctx context.Context, k string, subdocKey string) ([]byte, uint64, error) {
Expand Down Expand Up @@ -342,29 +330,13 @@ func (lds *LeakyDataStore) IsSupported(feature sgbucket.BucketStoreFeature) bool
return lds.dataStore.IsSupported(feature)
}

func (lds *LeakyDataStore) SubdocSetXattr(ctx context.Context, k string, xattrKey string, xv interface{}) (casOut uint64, err error) {
raw, err := JSONMarshal(xv)
if err == nil {
casOut, err = lds.dataStore.SetXattr(ctx, k, xattrKey, raw)
}
return
}

func (lds *LeakyDataStore) UpdateXattr(ctx context.Context, k string, xattrKey string, exp uint32, cas uint64, xv interface{}, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return lds.dataStore.UpdateXattr(ctx, k, xattrKey, exp, cas, xv, opts)
func (lds *LeakyDataStore) UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xv map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return lds.dataStore.UpdateXattrs(ctx, k, exp, cas, xv, opts)
}

func (lds *LeakyDataStore) UpdateXattrDeleteBody(ctx context.Context, k, xattrKey string, exp uint32, cas uint64, xv interface{}, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return lds.dataStore.UpdateXattrDeleteBody(ctx, k, xattrKey, exp, cas, xv, opts)

}

func (lds *LeakyDataStore) DeleteXattr(ctx context.Context, k string, xattrKey string, cas uint64) error {
return lds.dataStore.RemoveXattr(ctx, k, xattrKey, cas)
}
func (lds *LeakyDataStore) WriteTombstoneWithXattrs(ctx context.Context, k string, exp uint32, cas uint64, xv map[string][]byte, deleteBody bool, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return lds.dataStore.WriteTombstoneWithXattrs(ctx, k, exp, cas, xv, deleteBody, opts)

func (lds *LeakyDataStore) DeleteBody(ctx context.Context, k string, xattrKey string, exp uint32, cas uint64, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {
return lds.dataStore.DeleteBody(ctx, k, xattrKey, exp, cas, opts)
}

func (lds *LeakyDataStore) GetSpec() BucketSpec {
Expand Down
6 changes: 3 additions & 3 deletions base/rosmar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *RosmarCluster) InsertMetadataDocument(ctx context.Context, location, ke
}
defer bucket.Close(ctx)

return bucket.DefaultDataStore().WriteCas(key, 0, 0, 0, value, 0)
return bucket.DefaultDataStore().WriteCas(key, 0, 0, value, 0)
}

// WriteMetadataDocument writes a metadata document, and fails on CAS mismatch
Expand All @@ -65,7 +65,7 @@ func (c *RosmarCluster) WriteMetadataDocument(ctx context.Context, location, doc
}
defer bucket.Close(ctx)

return bucket.DefaultDataStore().WriteCas(docID, 0, 0, cas, value, 0)
return bucket.DefaultDataStore().WriteCas(docID, 0, cas, value, 0)
}

// TouchMetadataDocument sets the specified property in a bootstrap metadata document for a given bucket and key. Used to
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do
return removeCasOut, nil
}

replaceCfgCasOut, err := bucket.DefaultDataStore().WriteCas(docID, 0, 0, cas, newConfig, 0)
replaceCfgCasOut, err := bucket.DefaultDataStore().WriteCas(docID, 0, cas, newConfig, 0)
if err != nil {
if errors.As(err, &sgbucket.CasMismatchErr{}) {
// retry on cas failure
Expand Down
2 changes: 1 addition & 1 deletion base/sg_cluster_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *CfgSG) Set(cfgKey string, val []byte, cas uint64) (uint64, error) {
}

bucketKey := c.sgCfgBucketKey(cfgKey)
casOut, err := c.datastore.WriteCas(bucketKey, 0, 0, cas, val, 0)
casOut, err := c.datastore.WriteCas(bucketKey, 0, cas, val, 0)

if IsCasMismatch(err) {
InfofCtx(c.loggingCtx, KeyCluster, "cfg_sg: Set, ErrKeyExists key: %s, cas: %d", cfgKey, cas)
Expand Down
8 changes: 5 additions & 3 deletions db/attachment_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c

for attachmentName, attachmentDocID := range attachmentKeys {
// Stamp the current compaction ID into the attachment xattr. This is performing the actual marking
_, err = dataStore.SetXattr(ctx, attachmentDocID, getCompactionIDSubDocPath(compactionID), []byte(strconv.Itoa(int(time.Now().Unix()))))
_, err = dataStore.SetXattrs(ctx, attachmentDocID, map[string][]byte{
getCompactionIDSubDocPath(compactionID): []byte(strconv.Itoa(int(time.Now().Unix())))},
)

// If an error occurs while stamping in that ID we need to fail this process and then the entire compaction
// process. Otherwise, an attachment could end up getting erroneously deleted in the later sweep phase.
Expand Down Expand Up @@ -469,7 +471,7 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
// Note that if this operation fails with a cas mismatch we will fall through to the following per ID
// delete. This can occur if another compact process ends up mutating / deleting the xattr.
if len(compactIDSyncMap) == len(toDeleteCompactIDPaths) {
err = dataStore.RemoveXattr(ctx, docID, base.AttachmentCompactionXattrName, event.Cas)
err = dataStore.RemoveXattrs(ctx, docID, []string{base.AttachmentCompactionXattrName}, event.Cas)
if err == nil {
return true
}
Expand All @@ -481,7 +483,7 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
}

// If we only want to remove select compact IDs delete each one through a subdoc operation
err = dataStore.DeleteXattrs(ctx, docID, toDeleteCompactIDPaths...)
err = dataStore.DeleteSubDocPaths(ctx, docID, toDeleteCompactIDPaths...)
if err != nil && !errors.Is(err, base.ErrXattrNotFound) {
base.WarnfCtx(ctx, "[%s] Failed to delete compaction IDs %s for doc %s: %v", compactionLoggingID, strings.Join(toDeleteCompactIDPaths, ","), base.UD(docID), err)
return true
Expand Down
Loading

0 comments on commit 8b976f5

Please sign in to comment.