Skip to content

Commit

Permalink
internal/client: Goland-move to pkg/kv
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
tbg committed Mar 9, 2020
1 parent 0208965 commit f176a39
Show file tree
Hide file tree
Showing 293 changed files with 2,100 additions and 2,151 deletions.
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -60,7 +60,7 @@ func countRows(raw roachpb.BulkOpSummary, pkIDs map[uint64]struct{}) RowCount {
return res
}

func allRangeDescriptors(ctx context.Context, txn *client.Txn) ([]roachpb.RangeDescriptor, error) {
func allRangeDescriptors(ctx context.Context, txn *kv.Txn) ([]roachpb.RangeDescriptor, error) {
rows, err := txn.Scan(ctx, keys.Meta2Prefix, keys.MetaMax, 0)
if err != nil {
return nil, errors.Wrapf(err,
Expand Down Expand Up @@ -161,7 +161,7 @@ type spanAndTime struct {
// file.
func backup(
ctx context.Context,
db *client.DB,
db *kv.DB,
gossip *gossip.Gossip,
settings *cluster.Settings,
defaultStore cloud.ExternalStorage,
Expand All @@ -186,7 +186,7 @@ func backup(
var checkpointMu syncutil.Mutex

var ranges []roachpb.RangeDescriptor
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
// TODO(benesch): limit the range descriptors we fetch to the ranges that
// are actually relevant in the backup to speed up small backups on large
Expand Down Expand Up @@ -306,7 +306,7 @@ func backup(
MVCCFilter: roachpb.MVCCFilter(backupManifest.MVCCFilter),
Encryption: encryption,
}
rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
rawRes, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
return pErr.GoError()
}
Expand Down Expand Up @@ -518,7 +518,7 @@ func (b *backupResumer) Resume(
return nil
}

func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error {
func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {
details := b.job.Details().(jobspb.BackupDetails)
var backupManifest BackupManifest
if err := protoutil.Unmarshal(details.BackupManifest, &backupManifest); err != nil {
Expand All @@ -530,7 +530,7 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error {
return err
}
details.BackupManifest = descBytes
err = DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err = DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return b.job.WithTxn(txn).SetDetails(ctx, details)
})
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/sampledataccl"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -2475,7 +2475,7 @@ func TestRestoreAsOfSystemTimeGCBounds(t *testing.T) {
},
Threshold: tc.Server(0).Clock().Now(),
}
if _, err := client.SendWrapped(
if _, err := kv.SendWrapped(
ctx, tc.Server(0).DistSenderI().(*kvcoord.DistSender), &gcr,
); err != nil {
t.Fatal(err)
Expand Down
26 changes: 13 additions & 13 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -291,7 +291,7 @@ rangeLoop:
func splitAndScatter(
restoreCtx context.Context,
settings *cluster.Settings,
db *client.DB,
db *kv.DB,
kr *storageccl.KeyRewriter,
numClusterNodes int,
importSpans []importEntry,
Expand Down Expand Up @@ -353,7 +353,7 @@ func splitAndScatter(
// span being restored into.
RandomizeLeases: true,
}
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
if _, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -397,7 +397,7 @@ func splitAndScatter(
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{Key: newSpanKey, EndKey: newSpanKey.Next()}),
}
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
if _, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -427,7 +427,7 @@ func splitAndScatter(
// on that database at the time this function is called.
func WriteTableDescs(
ctx context.Context,
txn *client.Txn,
txn *kv.Txn,
databases []*sqlbase.DatabaseDescriptor,
tables []*sqlbase.TableDescriptor,
descCoverage tree.DescriptorCoverage,
Expand Down Expand Up @@ -557,7 +557,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb.
// files.
func restore(
restoreCtx context.Context,
db *client.DB,
db *kv.DB,
gossip *gossip.Gossip,
settings *cluster.Settings,
backupManifests []BackupManifest,
Expand Down Expand Up @@ -710,7 +710,7 @@ func restore(
defer tracing.FinishSpan(importSpan)
defer func() { <-importsSem }()

importRes, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), importRequest)
importRes, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), importRequest)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan)

Expand Down Expand Up @@ -834,14 +834,14 @@ func remapRelevantStatistics(
// after the other.
func isDatabaseEmpty(
ctx context.Context,
db *client.DB,
db *kv.DB,
dbDesc *sql.DatabaseDescriptor,
ignoredTables map[sqlbase.ID]struct{},
) (bool, error) {
var allDescs []sqlbase.Descriptor
if err := db.Txn(
ctx,
func(ctx context.Context, txn *client.Txn) error {
func(ctx context.Context, txn *kv.Txn) error {
var err error
allDescs, err = allSQLDescriptors(ctx, txn)
return err
Expand Down Expand Up @@ -923,7 +923,7 @@ func createImportingTables(
}

if !details.PrepareCompleted {
err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Write the new TableDescriptors which are set in the OFFLINE state.
if err := WriteTableDescs(ctx, txn, databases, tables, details.DescriptorCoverage, r.job.Payload().Username, r.settings, nil /* extra */); err != nil {
return errors.Wrapf(err, "restoring %d TableDescriptors from %d databases", len(r.tables), len(databases))
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func (r *restoreResumer) insertStats(ctx context.Context) error {
return nil
}

err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := stats.InsertNewStats(ctx, r.execCfg.InternalExecutor, txn, r.latestStats); err != nil {
return errors.Wrapf(err, "inserting stats from backup")
}
Expand All @@ -1053,7 +1053,7 @@ func (r *restoreResumer) publishTables(ctx context.Context) error {
}
log.Event(ctx, "making tables live")

err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := r.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Write the new TableDescriptors and flip state over to public so they can be
// accessed.
b := txn.NewBatch()
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) er
}

// dropTables implements the OnFailOrCancel logic.
func (r *restoreResumer) dropTables(ctx context.Context, txn *client.Txn) error {
func (r *restoreResumer) dropTables(ctx context.Context, txn *kv.Txn) error {
details := r.job.Details().(jobspb.RestoreDetails)

// No need to mark the tables as dropped if they were not even created in the
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
Expand Down Expand Up @@ -219,7 +219,7 @@ func allocateTableRewrites(

// Fail fast if the necessary databases don't exist or are otherwise
// incompatible with this restore.
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
maxExpectedDB := keys.MinUserDescID + sql.MaxDefaultDescriptorID
// Check that any DBs being restored do _not_ exist.
for name := range restoreDBNames {
Expand Down
18 changes: 8 additions & 10 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"sort"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -297,7 +297,7 @@ func descriptorsMatchingTargets(
// a descriptor the the ID by which it was previously known (e.g pre-TRUNCATE).
func getRelevantDescChanges(
ctx context.Context,
db *client.DB,
db *kv.DB,
startTime, endTime hlc.Timestamp,
descs []sqlbase.Descriptor,
expanded []sqlbase.ID,
Expand Down Expand Up @@ -402,7 +402,7 @@ func getRelevantDescChanges(
// nil content).
func getAllDescChanges(
ctx context.Context,
db *client.DB,
db *kv.DB,
startTime, endTime hlc.Timestamp,
priorIDs map[sqlbase.ID]sqlbase.ID,
) ([]BackupManifest_DescriptorRevision, error) {
Expand Down Expand Up @@ -440,7 +440,7 @@ func getAllDescChanges(
return res, nil
}

func allSQLDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descriptor, error) {
func allSQLDescriptors(ctx context.Context, txn *kv.Txn) ([]sqlbase.Descriptor, error) {
startKey := roachpb.Key(keys.MakeTablePrefix(keys.DescriptorTableID))
endKey := startKey.PrefixEnd()
rows, err := txn.Scan(ctx, startKey, endKey, 0)
Expand Down Expand Up @@ -492,12 +492,12 @@ func ensureInterleavesIncluded(tables []*sqlbase.TableDescriptor) error {
}

func loadAllDescs(
ctx context.Context, db *client.DB, asOf hlc.Timestamp,
ctx context.Context, db *kv.DB, asOf hlc.Timestamp,
) ([]sqlbase.Descriptor, error) {
var allDescs []sqlbase.Descriptor
if err := db.Txn(
ctx,
func(ctx context.Context, txn *client.Txn) error {
func(ctx context.Context, txn *kv.Txn) error {
var err error
txn.SetFixedTimestamp(ctx, asOf)
allDescs, err = allSQLDescriptors(ctx, txn)
Expand Down Expand Up @@ -592,7 +592,7 @@ func fullClusterTargets(
return fullClusterDescs, fullClusterDBs, nil
}

func lookupDatabaseID(ctx context.Context, txn *client.Txn, name string) (sqlbase.ID, error) {
func lookupDatabaseID(ctx context.Context, txn *kv.Txn, name string) (sqlbase.ID, error) {
found, id, err := sqlbase.LookupDatabaseID(ctx, txn, name)
if err != nil {
return sqlbase.InvalidID, err
Expand All @@ -605,9 +605,7 @@ func lookupDatabaseID(ctx context.Context, txn *client.Txn, name string) (sqlbas

// CheckTableExists returns an error if a table already exists with given
// parent and name.
func CheckTableExists(
ctx context.Context, txn *client.Txn, parentID sqlbase.ID, name string,
) error {
func CheckTableExists(ctx context.Context, txn *kv.Txn, parentID sqlbase.ID, name string) error {
found, _, err := sqlbase.LookupPublicTableID(ctx, txn, parentID, name)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ package changefeedccl
import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -89,7 +89,7 @@ func distChangefeedFlow(
}

// Changefeed flows handle transactional consistency themselves.
var noTxn *client.Txn
var noTxn *kv.Txn
gatewayNodeID := execCfg.NodeID.Get()
dsp := phs.DistSQLPlanner()
evalCtx := phs.ExtendedEvalContext()
Expand Down Expand Up @@ -195,10 +195,10 @@ func distChangefeedFlow(
}

func fetchSpansForTargets(
ctx context.Context, db *client.DB, targets jobspb.ChangefeedTargets, ts hlc.Timestamp,
ctx context.Context, db *kv.DB, targets jobspb.ChangefeedTargets, ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
spans = nil
txn.SetFixedTimestamp(ctx, ts)
// Note that all targets are currently guaranteed to be tables.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -796,7 +796,7 @@ func fetchDescVersionModificationTime(
}
clock := hlc.NewClock(hlc.UnixNano, time.Minute)
hh := roachpb.Header{Timestamp: clock.Now()}
res, pErr := client.SendWrappedWith(context.Background(),
res, pErr := kv.SendWrappedWith(context.Background(),
f.Server().DB().NonTransactionalSender(), hh, req)
if pErr != nil {
t.Fatal(pErr.GoError())
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -36,7 +36,7 @@ import (
// Config configures a kvfeed.
type Config struct {
Settings *cluster.Settings
DB *client.DB
DB *kv.DB
Clock *hlc.Clock
Gossip *gossip.Gossip
Spans []roachpb.Span
Expand Down Expand Up @@ -83,7 +83,7 @@ func Run(ctx context.Context, cfg Config) error {
var pff physicalFeedFactory
{
sender := cfg.DB.NonTransactionalSender()
distSender := sender.(*client.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
pff = rangefeedFactory(distSender.RangeFeed)
}
bf := func() EventBuffer {
Expand Down
Loading

0 comments on commit f176a39

Please sign in to comment.