Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: introduce pkg/migrations #56107

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,3 +754,27 @@ func (b *Batch) addSSTable(
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
//lint:ignore U1001 unused
func (b *Batch) migrate(s, e interface{}, targetVersion roachpb.Version) {
irfansharif marked this conversation as resolved.
Show resolved Hide resolved
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.MigrateRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
TargetVersion: targetVersion,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
12 changes: 12 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,18 @@ func (db *DB) AddSSTable(
return getOneErr(db.Run(ctx, b), b)
}

// Migrate proactively forces ranges overlapping with the provided keyspace to
// transition out of any legacy modes of operation (as defined by the target
// version).
//lint:ignore U1001 unused
func (db *DB) Migrate(
ctx context.Context, begin, end interface{}, targetVersion roachpb.Version,
) error {
b := &Batch{}
b.migrate(begin, end, targetVersion)
return getOneErr(db.Run(ctx, b), b)
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"cmd_lease_request.go",
"cmd_lease_transfer.go",
"cmd_merge.go",
"cmd_migrate.go",
"cmd_push_txn.go",
"cmd_put.go",
"cmd_query_intent.go",
Expand Down
46 changes: 46 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

func init() {
RegisterReadWriteCommand(roachpb.Migrate, declareKeysMigrate, Migrate)
}

func declareKeysMigrate(
_ *roachpb.RangeDescriptor, _ roachpb.Header, _ roachpb.Request, _, _ *spanset.SpanSet,
) {
}

// Migrate ensures that the range proactively carries out any outstanding
// below-Raft migrations.
//
// TODO(irfansharif): Do something real here. It's not currently wired up to
// anything. We'll eventually want a `map[roachpb.Version]Fn` mapping here that
// lets us define specific migrations for specific target versions.
func Migrate(
ctx context.Context, _ storage.ReadWriter, cArgs CommandArgs, _ roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.MigrateRequest)
h := cArgs.Header

log.Infof(ctx, "evaluating Migrate command for r%d@%s [%s, %s)", h.RangeID, args.TargetVersion, args.Key, args.EndKey)
return result.Result{}, nil
}
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,30 @@ func (r *Replica) executeWriteBatch(
log.Warningf(ctx, "%v", err)
}
}
if ba.Requests[0].GetMigrate() != nil && propResult.Err == nil {
// Migrate is special since it wants commands to be durably
// applied on all peers, which we achieve via waitForApplication.
//
// TODO(tbg,irfansharif): Could a snapshot below maxLeaseIndex
// be in-flight to a follower that's not in `desc` (but will be
// in it soon)? This seems to be in the realm of what's possible
// in theory: we'll only ever proactively send a snap when the
// peer is a learner, but an old snapshot that precedes
// maxLeaseIndex may be in flight for no good reason and once
// that peer is added to desc (after this code has done its
// work) and receives said zombie snap, it'll be initialized
// without having caught up past the migration. What's the
// safeguard needed here? I'm not sure. If we force a log
// truncation right after this proposal, would that force all
// to-be replicas to first catch up? Does that provide the
// invariants we want?
desc := r.Desc()
// NB: waitForApplication already has a timeout.
applicationErr := waitForApplication(
ctx, r.store.cfg.NodeDialer, desc.RangeID, desc.Replicas().All(),
uint64(maxLeaseIndex))
propResult.Err = roachpb.NewError(applicationErr)
}
return propResult.Reply, nil, propResult.Err
case <-slowTimer.C:
slowTimer.Read = true
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,9 @@ func (*ImportRequest) Method() Method { return Import }
// Method implements the Request interface.
func (*AdminScatterRequest) Method() Method { return AdminScatter }

// Method implements the Request interface.
func (*MigrateRequest) Method() Method { return Migrate }

// Method implements the Request interface.
func (*AddSSTableRequest) Method() Method { return AddSSTable }

Expand Down Expand Up @@ -970,6 +973,12 @@ func (r *AdminScatterRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *MigrateRequest) ShallowCopy() Request {
shallowCopy := *r
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *AddSSTableRequest) ShallowCopy() Request {
shallowCopy := *r
Expand Down Expand Up @@ -1301,6 +1310,7 @@ func (*ExportRequest) flags() int { return isRead | isRan
func (*ImportRequest) flags() int { return isAdmin | isAlone }
func (*AdminScatterRequest) flags() int { return isAdmin | isRange | isAlone }
func (*AdminVerifyProtectedTimestampRequest) flags() int { return isAdmin | isRange | isAlone }
func (*MigrateRequest) flags() int { return isWrite | isRange | isAlone }
func (*AddSSTableRequest) flags() int {
return isWrite | isRange | isAlone | isUnsplittable | canBackpressure
}
Expand Down
Loading