Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3703 create a basic bucket to bucket rosmar implementation #6761

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 14 additions & 30 deletions xdcr/cbs_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"net/url"
"strings"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
dto "github.com/prometheus/client_model/go"
)
Expand All @@ -27,27 +26,13 @@ const (
totalDocsWrittenStat = "xdcr_docs_written_total"
)

type serverMobileSetting uint8

const (
serverMobileOff = iota
serverMobileOn
)

const (
MobileOff = "Off"
MobileActive = "Active"
)

var MobileCompatibilityStrings = [...]string{MobileOff, MobileActive}

// CouchbaseServerXDCR implements a XDCR setup cluster on Couchbase Server.
type CouchbaseServerXDCR struct {
// couchbaseServerManager implements a XDCR setup cluster on Couchbase Server.
type couchbaseServerManager struct {
fromBucket *base.GocbV2Bucket
toBucket *base.GocbV2Bucket
replicationID string
MobileSetting string
filter string
mobileSetting MobileSetting
}

// isClusterPresent returns true if the XDCR cluster is present, false if it is not present, and an error if it could not be determined.
Expand Down Expand Up @@ -118,8 +103,8 @@ func createCluster(ctx context.Context, bucket *base.GocbV2Bucket) error {
return nil
}

// NewCouchbaseServerXDCR creates an instance of XDCR backed by Couchbase Server. This is not started until Start is called.
func NewCouchbaseServerXDCR(ctx context.Context, fromBucket *base.GocbV2Bucket, toBucket *base.GocbV2Bucket, mobileSetting serverMobileSetting) (*CouchbaseServerXDCR, error) {
// newCouchbaseServerManager creates an instance of XDCR backed by Couchbase Server. This is not started until Start is called.
func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucket, toBucket *base.GocbV2Bucket, opts XDCROptions) (*couchbaseServerManager, error) {
isPresent, err := isClusterPresent(ctx, fromBucket)
if err != nil {
return nil, err
Expand All @@ -135,15 +120,16 @@ func NewCouchbaseServerXDCR(ctx context.Context, fromBucket *base.GocbV2Bucket,
if err != nil {
return nil, err
}
return &CouchbaseServerXDCR{
return &couchbaseServerManager{
fromBucket: fromBucket,
toBucket: toBucket,
MobileSetting: MobileCompatibilityStrings[mobileSetting],
mobileSetting: opts.Mobile,
filter: opts.FilterExpression,
}, nil
}

// Start starts the XDCR replication.
func (x *CouchbaseServerXDCR) Start(ctx context.Context) error {
func (x *couchbaseServerManager) Start(ctx context.Context) error {
method := http.MethodPost
body := url.Values{}
body.Add("name", xdcrClusterName)
Expand All @@ -152,7 +138,7 @@ func (x *CouchbaseServerXDCR) Start(ctx context.Context) error {
body.Add("toCluster", xdcrClusterName)
body.Add("replicationType", "continuous")
// set the mobile flag on the replication
body.Add("mobile", x.MobileSetting)
body.Add("mobile", x.mobileSetting.String())
// add filter is needed
if x.filter != "" {
body.Add("filterExpression", x.filter)
Expand Down Expand Up @@ -181,7 +167,7 @@ func (x *CouchbaseServerXDCR) Start(ctx context.Context) error {
}

// Stop starts the XDCR replication and deletes the replication from Couchbase Server.
func (x *CouchbaseServerXDCR) Stop(ctx context.Context) error {
func (x *couchbaseServerManager) Stop(ctx context.Context) error {
method := http.MethodDelete
url := "/controller/cancelXDCR/" + url.PathEscape(x.replicationID)
output, statusCode, err := x.fromBucket.MgmtRequest(ctx, method, url, "application/x-www-form-urlencoded", nil)
Expand All @@ -196,7 +182,7 @@ func (x *CouchbaseServerXDCR) Stop(ctx context.Context) error {
}

// Stats returns the stats of the XDCR replication.
func (x *CouchbaseServerXDCR) Stats(ctx context.Context) (*Stats, error) {
func (x *couchbaseServerManager) Stats(ctx context.Context) (*Stats, error) {
mf, err := x.fromBucket.ServerMetrics(ctx)
if err != nil {
return nil, err
Expand All @@ -213,7 +199,7 @@ func (x *CouchbaseServerXDCR) Stats(ctx context.Context) (*Stats, error) {
return stats, nil
}

func (x *CouchbaseServerXDCR) getValue(metrics *dto.MetricFamily) (uint64, error) {
func (x *couchbaseServerManager) getValue(metrics *dto.MetricFamily) (uint64, error) {
outer:
for _, metric := range metrics.GetMetric() {
for _, label := range metric.Label {
Expand All @@ -239,6 +225,4 @@ outer:
return 0, fmt.Errorf("Could not find relevant value for metrics %v", metrics)
}

func BucketSupportsMobileXDCR(bucket base.Bucket) bool {
return bucket.IsSupported(sgbucket.BucketStoreFeatureMobileXDCR)
}
var _ Manager = &couchbaseServerManager{}
113 changes: 0 additions & 113 deletions xdcr/cbs_xdcr_test.go

This file was deleted.

61 changes: 57 additions & 4 deletions xdcr/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@
// Package xdcr implements an XDCR between different buckets. This is meant to be used for testing.
package xdcr

import "context"
import (
"context"
"fmt"

// Replication represents a bucket to bucket replication.
type Replication interface {
"github.com/couchbase/sync_gateway/base"
"github.com/couchbaselabs/rosmar"
)

// Manager represents a bucket to bucket replication.
type Manager interface {
// Start starts the replication.
Start(context.Context) error
// Stop terminates the replication.
Expand All @@ -21,4 +27,51 @@ type Replication interface {
Stats(context.Context) (*Stats, error)
}

var _ Replication = &CouchbaseServerXDCR{}
// MobileSetting represents the presence of "-mobile" flag for Couchbase Server replications. -mobile implies filtering sync metadata, handling version vectors, and filtering sync documents.
type MobileSetting uint8

const (
MobileOff = iota
MobileOn
)

// String returns the string representation of the XDCRMobileSetting, used for directly passing on to the Couchbase Server REST API.
func (s MobileSetting) String() string {
switch s {
case MobileOff:
return "Off"
case MobileOn:
return "Active"
default:
return "Unknown"
}
}

// XDCROptions represents the options for creating an XDCR.
type XDCROptions struct {
// FilterExpression is the filter expression to use for the replication.
FilterExpression string
// XDCR mobile setting defines whether XDCR replication will use -mobile setting behavior in Couchbase Server.
Mobile MobileSetting
}

// NewXDCR creates a new XDCR between two buckets.
func NewXDCR(ctx context.Context, fromBucket, toBucket base.Bucket, opts XDCROptions) (Manager, error) {
gocbFromBucket, err := base.AsGocbV2Bucket(fromBucket)
if err != nil {
rosmarFromBucket, ok := base.GetBaseBucket(fromBucket).(*rosmar.Bucket)
if !ok {
return nil, fmt.Errorf("fromBucket must be a *base.GocbV2Bucket or *rosmar.Bucket, got %T", fromBucket)
}
rosmarToBucket, ok := base.GetBaseBucket(toBucket).(*rosmar.Bucket)
if !ok {
return nil, fmt.Errorf("toBucket must be a *rosmar.Bucket since fromBucket was a rosmar bucket, got %T", toBucket)
}
return newRosmarManager(ctx, rosmarFromBucket, rosmarToBucket, opts)
}
gocbToBucket, err := base.AsGocbV2Bucket(toBucket)
if err != nil {
return nil, fmt.Errorf("toBucket must be a *base.GocbV2Bucket since fromBucket was a gocbBucket, got %T", toBucket)
}
return newCouchbaseServerManager(ctx, gocbFromBucket, gocbToBucket, opts)
}
Loading
Loading