Skip to content

Commit

Permalink
storage: plumb AdminChangeReplicas request
Browse files Browse the repository at this point in the history
Plumbing an `AdminChangeReplicas` request. Refactoring the `testcluster` code
to issue this operation rather than calling into the `Replica` directly.

This request will be used to implement `SCATTER` and `TESTING_RELOCATE`
(cockroachdb#13665).
  • Loading branch information
RaduBerinde committed Mar 23, 2017
1 parent 1d5737c commit 9e2fa01
Show file tree
Hide file tree
Showing 11 changed files with 1,281 additions and 782 deletions.
22 changes: 22 additions & 0 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (b *Batch) fillResults() error {
case *roachpb.AdminMergeRequest:
case *roachpb.AdminSplitRequest:
case *roachpb.AdminTransferLeaseRequest:
case *roachpb.AdminChangeReplicasRequest:
case *roachpb.HeartbeatTxnRequest:
case *roachpb.GCRequest:
case *roachpb.PushTxnRequest:
Expand Down Expand Up @@ -608,6 +609,27 @@ func (b *Batch) adminTransferLease(key interface{}, target roachpb.StoreID) {
b.initResult(1, 0, notRaw, nil)
}

// adminChangeReplicas is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminChangeReplicas(
key interface{}, changeType roachpb.ReplicaChangeType, targets []roachpb.ReplicationTarget,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.AdminChangeReplicasRequest{
Span: roachpb.Span{
Key: k,
},
ChangeType: changeType,
Targets: targets,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// writeBatch is only exported on DB.
func (b *Batch) writeBatch(s, e interface{}, data []byte) {
begin, err := marshalKey(s)
Expand Down
12 changes: 12 additions & 0 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,18 @@ func (db *DB) AdminTransferLease(
return getOneErr(db.Run(ctx, b), b)
}

// AdminChangeReplicas adds or removes a set of replicas for a range.
func (db *DB) AdminChangeReplicas(
ctx context.Context,
key interface{},
changeType roachpb.ReplicaChangeType,
targets []roachpb.ReplicationTarget,
) error {
b := &Batch{}
b.adminChangeReplicas(key, changeType, targets)
return getOneErr(db.Run(ctx, b), b)
}

// CheckConsistency runs a consistency check on all the ranges containing
// the key span. It logs a diff of all the keys that are inconsistent
// when withDiff is set to true.
Expand Down
1 change: 1 addition & 0 deletions pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func TestCommonMethods(t *testing.T) {
{dbType, "AdminMerge"}: {},
{dbType, "AdminSplit"}: {},
{dbType, "AdminTransferLease"}: {},
{dbType, "AdminChangeReplicas"}: {},
{dbType, "CheckConsistency"}: {},
{dbType, "Run"}: {},
{dbType, "Txn"}: {},
Expand Down
42 changes: 26 additions & 16 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ func (*AdminMergeRequest) Method() Method { return AdminMerge }
// Method implements the Request interface.
func (*AdminTransferLeaseRequest) Method() Method { return AdminTransferLease }

// Method implements the Request interface.
func (*AdminChangeReplicasRequest) Method() Method { return AdminChangeReplicas }

// Method implements the Request interface.
func (*HeartbeatTxnRequest) Method() Method { return HeartbeatTxn }

Expand Down Expand Up @@ -586,6 +589,12 @@ func (atlr *AdminTransferLeaseRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (acrr *AdminChangeReplicasRequest) ShallowCopy() Request {
shallowCopy := *acrr
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (htr *HeartbeatTxnRequest) ShallowCopy() Request {
shallowCopy := *htr
Expand Down Expand Up @@ -863,22 +872,23 @@ func (drr *DeleteRangeRequest) flags() int {
}
return isWrite | isTxn | isTxnWrite | isRange
}
func (*ScanRequest) flags() int { return isRead | isRange | isTxn }
func (*ReverseScanRequest) flags() int { return isRead | isRange | isReverse | isTxn }
func (*BeginTransactionRequest) flags() int { return isWrite | isTxn }
func (*EndTransactionRequest) flags() int { return isWrite | isTxn | isAlone }
func (*AdminSplitRequest) flags() int { return isAdmin | isAlone }
func (*AdminMergeRequest) flags() int { return isAdmin | isAlone }
func (*AdminTransferLeaseRequest) flags() int { return isAdmin | isAlone }
func (*HeartbeatTxnRequest) flags() int { return isWrite | isTxn }
func (*GCRequest) flags() int { return isWrite | isRange }
func (*PushTxnRequest) flags() int { return isWrite | isAlone }
func (*QueryTxnRequest) flags() int { return isRead | isAlone }
func (*RangeLookupRequest) flags() int { return isRead }
func (*ResolveIntentRequest) flags() int { return isWrite }
func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange }
func (*NoopRequest) flags() int { return isRead } // slightly special
func (*TruncateLogRequest) flags() int { return isWrite | isNonKV }
func (*ScanRequest) flags() int { return isRead | isRange | isTxn }
func (*ReverseScanRequest) flags() int { return isRead | isRange | isReverse | isTxn }
func (*BeginTransactionRequest) flags() int { return isWrite | isTxn }
func (*EndTransactionRequest) flags() int { return isWrite | isTxn | isAlone }
func (*AdminSplitRequest) flags() int { return isAdmin | isAlone }
func (*AdminMergeRequest) flags() int { return isAdmin | isAlone }
func (*AdminTransferLeaseRequest) flags() int { return isAdmin | isAlone }
func (*AdminChangeReplicasRequest) flags() int { return isAdmin | isAlone }
func (*HeartbeatTxnRequest) flags() int { return isWrite | isTxn }
func (*GCRequest) flags() int { return isWrite | isRange }
func (*PushTxnRequest) flags() int { return isWrite | isAlone }
func (*QueryTxnRequest) flags() int { return isRead | isAlone }
func (*RangeLookupRequest) flags() int { return isRead }
func (*ResolveIntentRequest) flags() int { return isWrite }
func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange }
func (*NoopRequest) flags() int { return isRead } // slightly special
func (*TruncateLogRequest) flags() int { return isWrite | isNonKV }

// MergeRequests are considered "non KV" because they do not need to be gated
// by the command queue (reordering is ok) and they operate on non-MVCC data so
Expand Down
Loading

0 comments on commit 9e2fa01

Please sign in to comment.