From 4b0baf52de8386e3a4ba93c0cc63494d7180c79a Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten <nvanbenschoten@gmail.com> Date: Thu, 24 Mar 2016 16:22:00 -0400 Subject: [PATCH] storage: Link Node.Batch context correctly to timeout cmds In #5368, I was seeing that a call to `redirectOnOrAcquireLeaderLease` had been stuck for 541 minutes. This function selects on a leader lease channel, but also selects on a context cancellation. This means that the context should have timed out. It looks like we had dropped the original context with timeout in `Node.Batch`, which came from `kv.sendOne`. This change should properly link these two contexts together so that the timeout in the stuck command would work correctly. --- server/node.go | 20 ++- storage/gc_queue.go | 7 +- storage/gc_queue_test.go | 8 +- storage/intent_resolver.go | 2 +- storage/intent_resolver_test.go | 4 +- storage/queue.go | 4 +- storage/raft_transport.go | 4 +- storage/replica.go | 28 +-- storage/replica_command.go | 11 +- storage/replica_test.go | 290 ++++++++++++++++---------------- storage/split_queue.go | 5 +- storage/store.go | 16 +- storage/stores.go | 2 +- 13 files changed, 210 insertions(+), 191 deletions(-) diff --git a/server/node.go b/server/node.go index 727b9f756d59..8d2de33b667d 100644 --- a/server/node.go +++ b/server/node.go @@ -235,9 +235,13 @@ func NewNode(ctx storage.StoreContext, recorder *status.MetricsRecorder, stopper return n } -// context returns a context encapsulating the NodeID. -func (n *Node) context() context.Context { - return log.Add(context.Background(), log.NodeID, n.Descriptor.NodeID) +// context returns a context encapsulating the NodeID, derived from the +// supplied context (which is not allowed to be nil). +func (n *Node) context(ctx context.Context) context.Context { + if ctx == nil { + panic("ctx cannot be nil") + } + return log.Add(ctx, log.NodeID, n.Descriptor.NodeID) } // initDescriptor initializes the node descriptor with the server @@ -330,7 +334,7 @@ func (n *Node) start(addr net.Addr, engines []engine.Engine, attrs roachpb.Attri // Record node started event. n.recordJoinEvent() - log.Infoc(n.context(), "Started node with %v engine(s) and attributes %v", engines, attrs.Attrs) + log.Infoc(n.context(context.TODO()), "Started node with %v engine(s) and attributes %v", engines, attrs.Attrs) return nil } @@ -703,6 +707,8 @@ func (n *Node) Batch(ctx context.Context, args *roachpb.BatchRequest) (*roachpb. // back with the response. This is more expensive, but then again, // those are individual requests traced by users, so they can be. if sp.BaggageItem(tracing.Snowball) != "" { + sp.LogEvent("delegating to snowball tracing") + sp.Finish() if sp, err = tracing.JoinOrNewSnowball(opName, args.Trace, func(rawSpan basictracer.RawSpan) { encSp, err := tracing.EncodeRawSpan(&rawSpan, nil) if err != nil { @@ -715,14 +721,14 @@ func (n *Node) Batch(ctx context.Context, args *roachpb.BatchRequest) (*roachpb. } } defer sp.Finish() - ctx := opentracing.ContextWithSpan((*Node)(n).context(), sp) + traceCtx := opentracing.ContextWithSpan(n.context(ctx), sp) tStart := timeutil.Now() var pErr *roachpb.Error - br, pErr = n.stores.Send(ctx, *args) + br, pErr = n.stores.Send(traceCtx, *args) if pErr != nil { br = &roachpb.BatchResponse{} - log.Trace(ctx, fmt.Sprintf("error: %T", pErr.GetDetail())) + log.Trace(traceCtx, fmt.Sprintf("error: %T", pErr.GetDetail())) } if br.Error != nil { panic(roachpb.ErrorUnexpectedlySet(n.stores, br)) diff --git a/storage/gc_queue.go b/storage/gc_queue.go index 4434901da667..65d33af715ef 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + "github.com/gogo/protobuf/proto" "github.com/cockroachdb/cockroach/client" @@ -330,6 +332,7 @@ func processSequenceCache( // 8) send a GCRequest. func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, sysCfg config.SystemConfig) error { + ctx := repl.context(context.TODO()) snap := repl.store.Engine().NewSnapshot() desc := repl.Desc() @@ -346,7 +349,7 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, pushTxn(repl, now, txn, typ) }, func(intents []roachpb.Intent, poison bool, wait bool) error { - return repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, poison, wait) + return repl.store.intentResolver.resolveIntents(ctx, repl, intents, poison, wait) }) if err != nil { @@ -369,7 +372,7 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, ba.RangeID = desc.RangeID ba.Timestamp = now ba.Add(&gcArgs) - if _, pErr := repl.Send(repl.context(), ba); pErr != nil { + if _, pErr := repl.Send(ctx, ba); pErr != nil { return pErr.GoError() } return nil diff --git a/storage/gc_queue_test.go b/storage/gc_queue_test.go index 6a05c20285a4..b26e527bddc5 100644 --- a/storage/gc_queue_test.go +++ b/storage/gc_queue_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/roachpb" @@ -220,7 +222,7 @@ func TestGCQueueProcess(t *testing.T) { txn.OrigTimestamp = datum.ts txn.Timestamp = datum.ts } - if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Timestamp: datum.ts, Txn: txn, }, &dArgs); err != nil { @@ -234,7 +236,7 @@ func TestGCQueueProcess(t *testing.T) { txn.OrigTimestamp = datum.ts txn.Timestamp = datum.ts } - if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Timestamp: datum.ts, Txn: txn, }, &pArgs); err != nil { @@ -477,7 +479,7 @@ func TestGCQueueIntentResolution(t *testing.T) { // TODO(spencerkimball): benchmark with ~50k. for j := 0; j < 5; j++ { pArgs := putArgs(roachpb.Key(fmt.Sprintf("%d-%05d", i, j)), []byte("value")) - if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txns[i], }, &pArgs); err != nil { t.Fatalf("%d: could not put data: %s", i, err) diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 6e11b721140f..593eaa244fb9 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -249,7 +249,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA return } now := r.store.Clock().Now() - ctx := r.context() + ctx := r.context(context.TODO()) stopper := r.store.Stopper() for _, item := range intents { diff --git a/storage/intent_resolver_test.go b/storage/intent_resolver_test.go index d83665f453b5..c4b52ba1c7f9 100644 --- a/storage/intent_resolver_test.go +++ b/storage/intent_resolver_test.go @@ -20,6 +20,8 @@ package storage import ( "testing" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/testutils" "github.com/cockroachdb/cockroach/util/leaktest" @@ -36,7 +38,7 @@ func TestPushTransactionsWithNonPendingIntent(t *testing.T) { intents := []roachpb.Intent{{Span: roachpb.Span{Key: roachpb.Key("a")}, Status: roachpb.ABORTED}} if _, pErr := tc.store.intentResolver.maybePushTransactions( - tc.rng.context(), intents, roachpb.Header{}, roachpb.PUSH_TOUCH, true); !testutils.IsPError(pErr, "unexpected aborted/resolved intent") { + tc.rng.context(context.Background()), intents, roachpb.Header{}, roachpb.PUSH_TOUCH, true); !testutils.IsPError(pErr, "unexpected aborted/resolved intent") { t.Errorf("expected error on aborted/resolved intent, but got %s", pErr) } } diff --git a/storage/queue.go b/storage/queue.go index 33451ffb5bee..c1cf6d6ddf2b 100644 --- a/storage/queue.go +++ b/storage/queue.go @@ -24,6 +24,7 @@ import ( "sync/atomic" "time" + "golang.org/x/net/context" "golang.org/x/net/trace" "github.com/cockroachdb/cockroach/config" @@ -115,6 +116,7 @@ type queueImpl interface { // process accepts current time, a replica, and the system config // and executes queue-specific work on it. + // TODO(nvanbenschoten) this should provide a context.Context. process(roachpb.Timestamp, *Replica, config.SystemConfig) error // timer returns a duration to wait between processing the next item @@ -414,7 +416,7 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error { // and renew or acquire if necessary. if bq.impl.needsLeaderLease() { sp := repl.store.Tracer().StartSpan(bq.name) - ctx := opentracing.ContextWithSpan(repl.context(), sp) + ctx := opentracing.ContextWithSpan(repl.context(context.Background()), sp) defer sp.Finish() // Create a "fake" get request in order to invoke redirectOnOrAcquireLease. if err := repl.redirectOnOrAcquireLeaderLease(ctx); err != nil { diff --git a/storage/raft_transport.go b/storage/raft_transport.go index ae99be5753f9..0f3dd11f0075 100644 --- a/storage/raft_transport.go +++ b/storage/raft_transport.go @@ -181,7 +181,7 @@ func (t *RaftTransport) processQueue(nodeID roachpb.NodeID) { return } client := NewMultiRaftClient(conn) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() if log.V(1) { log.Infof("establishing Raft transport stream to node %d at %s", nodeID, addr) @@ -227,7 +227,7 @@ func (t *RaftTransport) processQueue(nodeID roachpb.NodeID) { return case req := <-ch: if req.Message.Type == raftpb.MsgSnap { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() snapStream, err := client.RaftMessage(ctx) if err != nil { diff --git a/storage/replica.go b/storage/replica.go index 0a0fd3333207..3b215f1fb8a1 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -376,12 +376,12 @@ func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error { return nil } -// context returns a context which is initialized with information about -// this range. It is only relevant when commands need to be executed -// on this range in the absence of a pre-existing context, such as -// during range scanner operations. -func (r *Replica) context() context.Context { - return context.WithValue(r.store.Context(nil), log.RangeID, r.RangeID) +// context returns a context with information about this range, derived from +// the supplied context (which is not allowed to be nil). It is only relevant +// when commands need to be executed on this range in the absence of a +// pre-existing context, such as during range scanner operations. +func (r *Replica) context(ctx context.Context) context.Context { + return context.WithValue(r.store.context(ctx), log.RangeID, r.RangeID) } // GetMaxBytes atomically gets the range maximum byte limit. @@ -479,7 +479,7 @@ func (r *Replica) requestLeaderLease(timestamp roachpb.Timestamp) <-chan *roachp // checks from normal request machinery, (e.g. the command queue). // Note that the command itself isn't traced, but usually the caller // waiting for the result has an active Trace. - cmd, err := r.proposeRaftCommand(r.context(), ba) + cmd, err := r.proposeRaftCommand(r.context(context.Background()), ba) if err != nil { return roachpb.NewError(err) } @@ -1399,7 +1399,7 @@ func (r *Replica) sendRaftMessage(msg raftpb.Message) { // the command's done channel, if available. func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, raftCmd roachpb.RaftCommand) *roachpb.Error { if index == 0 { - log.Fatalc(r.context(), "processRaftCommand requires a non-zero index") + log.Fatalc(r.context(context.TODO()), "processRaftCommand requires a non-zero index") } r.mu.Lock() @@ -1413,7 +1413,7 @@ func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, r ctx = cmd.ctx } else { // TODO(tschottdorf): consider the Trace situation here. - ctx = r.context() + ctx = r.context(context.Background()) } log.Trace(ctx, "applying batch") @@ -1426,7 +1426,7 @@ func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, r if cmd != nil { cmd.done <- roachpb.ResponseWithError{Reply: br, Err: err} } else if err != nil && log.V(1) { - log.Errorc(r.context(), "error executing raft command: %s", err) + log.Errorc(r.context(context.TODO()), "error executing raft command: %s", err) } return err @@ -1815,7 +1815,7 @@ func (r *Replica) maybeGossipFirstRange() *roachpb.Error { return nil } - ctx := r.context() + ctx := r.context(context.TODO()) // When multiple nodes are initialized with overlapping Gossip addresses, they all // will attempt to gossip their cluster ID. This is a fairly obvious misconfiguration, @@ -1879,7 +1879,7 @@ func (r *Replica) maybeGossipSystemConfig() { return } - ctx := r.context() + ctx := r.context(context.TODO()) // TODO(marc): check for bad split in the middle of the SystemConfig span. kvs, hash, err := r.loadSystemConfigSpan() if err != nil { @@ -1928,7 +1928,7 @@ func newReplicaCorruptionError(errs ...error) *roachpb.ReplicaCorruptionError { // range, store, node or cluster with corresponding actions taken. func (r *Replica) maybeSetCorrupt(pErr *roachpb.Error) *roachpb.Error { if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { - log.Errorc(r.context(), "stalling replica due to: %s", cErr.ErrorMsg) + log.Errorc(r.context(context.TODO()), "stalling replica due to: %s", cErr.ErrorMsg) cErr.Processed = true return roachpb.NewError(cErr) } @@ -1946,7 +1946,7 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) { ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.ScanRequest{Span: keys.SystemConfigSpan}) br, intents, pErr := - r.executeBatch(r.context(), storagebase.CmdIDKey(""), r.store.Engine(), nil, ba) + r.executeBatch(r.context(context.TODO()), storagebase.CmdIDKey(""), r.store.Engine(), nil, ba) if pErr != nil { return nil, nil, pErr.GoError() } diff --git a/storage/replica_command.go b/storage/replica_command.go index 9a33833b1243..d3db3bf52dd5 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -123,7 +123,7 @@ func (r *Replica) executeCmd(ctx context.Context, raftCmdID storagebase.CmdIDKey reply = &resp case *roachpb.EndTransactionRequest: var resp roachpb.EndTransactionResponse - resp, intents, err = r.EndTransaction(batch, ms, h, *tArgs) + resp, intents, err = r.EndTransaction(ctx, batch, ms, h, *tArgs) reply = &resp case *roachpb.RangeLookupRequest: var resp roachpb.RangeLookupResponse @@ -370,7 +370,7 @@ func (r *Replica) BeginTransaction( // TODO(tschottdorf): return nil reply on any error. The error itself // must be the authoritative source of information. func (r *Replica) EndTransaction( - batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.EndTransactionRequest, + ctx context.Context, batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.EndTransactionRequest, ) (roachpb.EndTransactionResponse, []roachpb.Intent, error) { var reply roachpb.EndTransactionResponse @@ -558,7 +558,7 @@ func (r *Replica) EndTransaction( if err := func() error { if ct.GetSplitTrigger() != nil { - if err := r.splitTrigger(r.context(), batch, ms, ct.SplitTrigger, reply.Txn.Timestamp); err != nil { + if err := r.splitTrigger(r.context(ctx), batch, ms, ct.SplitTrigger, reply.Txn.Timestamp); err != nil { return err } *ms = engine.MVCCStats{} // clear stats, as split recomputed. @@ -1379,6 +1379,7 @@ func (r *Replica) LeaderLease( func (r *Replica) CheckConsistency( args roachpb.CheckConsistencyRequest, desc *roachpb.RangeDescriptor, ) (roachpb.CheckConsistencyResponse, *roachpb.Error) { + ctx := r.context(context.TODO()) key := desc.StartKey.AsRawKey() endKey := desc.EndKey.AsRawKey() id := uuid.MakeV4() @@ -1396,7 +1397,7 @@ func (r *Replica) CheckConsistency( ChecksumID: id, } ba.Add(checkArgs) - _, pErr := r.Send(r.context(), ba) + _, pErr := r.Send(ctx, ba) if pErr != nil { return roachpb.CheckConsistencyResponse{}, pErr } @@ -1427,7 +1428,7 @@ func (r *Replica) CheckConsistency( Checksum: c.checksum, } ba.Add(checkArgs) - _, pErr := r.Send(r.context(), ba) + _, pErr := r.Send(ctx, ba) if pErr != nil { return roachpb.CheckConsistencyResponse{}, pErr } diff --git a/storage/replica_test.go b/storage/replica_test.go index 76abfa5da20f..24c17b121a41 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -295,7 +295,7 @@ func setLeaderLease(t *testing.T, r *Replica, l *roachpb.Lease) { ba := roachpb.BatchRequest{} ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.LeaderLeaseRequest{Lease: *l}) - pendingCmd, err := r.proposeRaftCommand(r.context(), ba) + pendingCmd, err := r.proposeRaftCommand(r.context(context.Background()), ba) if err == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -331,13 +331,13 @@ func TestReplicaReadConsistency(t *testing.T) { // Try consistent read and verify success. - if _, err := client.SendWrapped(tc.Sender(), tc.rng.context(), &gArgs); err != nil { + if _, err := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &gArgs); err != nil { t.Errorf("expected success on consistent read: %s", err) } // Try a consensus read and verify error. - if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.CONSENSUS, }, &gArgs); err == nil { t.Errorf("expected error on consensus read") @@ -346,7 +346,7 @@ func TestReplicaReadConsistency(t *testing.T) { // Try an inconsistent read within a transaction. txn := newTransaction("test", roachpb.Key("a"), 1, roachpb.SERIALIZABLE, tc.clock) - if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, ReadConsistency: roachpb.INCONSISTENT, }, &gArgs); err == nil { @@ -368,14 +368,14 @@ func TestReplicaReadConsistency(t *testing.T) { }) // Send without Txn. - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.CONSISTENT, }, &gArgs) if _, ok := pErr.GetDetail().(*roachpb.NotLeaderError); !ok { t.Errorf("expected not leader error; got %s", pErr) } - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.INCONSISTENT, }, &gArgs); pErr != nil { t.Errorf("expected success reading with inconsistent: %s", pErr) @@ -435,7 +435,7 @@ func TestReplicaRangeBoundsChecking(t *testing.T) { splitTestRange(tc.store, roachpb.RKey("a"), roachpb.RKey("a"), t) gArgs := getArgs(roachpb.Key("b")) - _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &gArgs) + _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &gArgs) if _, ok := pErr.GetDetail().(*roachpb.RangeKeyMismatchError); !ok { t.Errorf("expected range key mismatch error: %s", pErr) @@ -482,7 +482,7 @@ func TestReplicaLeaderLease(t *testing.T) { } { - pErr := tc.rng.redirectOnOrAcquireLeaderLease(tc.rng.context()) + pErr := tc.rng.redirectOnOrAcquireLeaderLease(tc.rng.context(context.Background())) if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaderError); !ok || lErr == nil { t.Fatalf("wanted NotLeaderError, got %s", pErr) } @@ -509,7 +509,7 @@ func TestReplicaLeaderLease(t *testing.T) { rng.mu.Unlock() { - if _, ok := rng.redirectOnOrAcquireLeaderLease(tc.rng.context()).GetDetail().(*roachpb.NotLeaderError); !ok { + if _, ok := rng.redirectOnOrAcquireLeaderLease(tc.rng.context(context.Background())).GetDetail().(*roachpb.NotLeaderError); !ok { t.Fatalf("expected %T, got %s", &roachpb.NotLeaderError{}, err) } } @@ -567,7 +567,7 @@ func TestReplicaNotLeaderError(t *testing.T) { } for i, test := range testCases { - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Timestamp: now}, test) + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Timestamp: now}, test) if _, ok := pErr.GetDetail().(*roachpb.NotLeaderError); !ok { t.Errorf("%d: expected not leader error: %s", i, pErr) @@ -752,7 +752,7 @@ func TestReplicaLeaderLeaseRejectUnknownRaftNodeID(t *testing.T) { ba := roachpb.BatchRequest{} ba.Timestamp = tc.rng.store.Clock().Now() ba.Add(&roachpb.LeaderLeaseRequest{Lease: *lease}) - pendingCmd, err := tc.rng.proposeRaftCommand(tc.rng.context(), ba) + pendingCmd, err := tc.rng.proposeRaftCommand(tc.rng.context(context.Background()), ba) if err == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -855,7 +855,7 @@ func TestReplicaNoGossipConfig(t *testing.T) { for i, test := range testCases { txn.Sequence++ - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), test.h, test.req); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), test.h, test.req); pErr != nil { t.Fatal(pErr) } txn.Writing = true @@ -1090,7 +1090,7 @@ func TestAcquireLeaderLease(t *testing.T) { tc.manualClock.Increment(int64(DefaultLeaderLeaseDuration + 1000)) ts := tc.clock.Now().Next() - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Timestamp: ts}, test); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Timestamp: ts}, test); pErr != nil { t.Fatal(pErr) } if held, expired := hasLease(tc.rng, ts); !held || expired { @@ -1123,7 +1123,7 @@ func TestReplicaUpdateTSCache(t *testing.T) { gArgs := getArgs([]byte("a")) ts := tc.clock.Now() - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Timestamp: ts}, &gArgs) + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Timestamp: ts}, &gArgs) if pErr != nil { t.Error(pErr) @@ -1135,7 +1135,7 @@ func TestReplicaUpdateTSCache(t *testing.T) { drArgs := roachpb.NewDeleteRange(key, key.Next(), false) ts = tc.clock.Now() - _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Timestamp: ts}, drArgs) + _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Timestamp: ts}, drArgs) if pErr != nil { t.Error(pErr) @@ -1208,7 +1208,7 @@ func TestReplicaCommandQueue(t *testing.T) { tc.stopper.RunAsyncTask(func() { args := readOrWriteArgs(key1, test.cmd1Read) - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ UserPriority: 42, }, args) @@ -1225,7 +1225,7 @@ func TestReplicaCommandQueue(t *testing.T) { tc.stopper.RunAsyncTask(func() { args := readOrWriteArgs(key1, test.cmd2Read) - _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), args) + _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), args) if pErr != nil { t.Fatalf("test %d: %s", i, pErr) @@ -1238,7 +1238,7 @@ func TestReplicaCommandQueue(t *testing.T) { tc.stopper.RunAsyncTask(func() { args := readOrWriteArgs(key2, true) - _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), args) + _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), args) if pErr != nil { t.Fatalf("test %d: %s", i, pErr) @@ -1316,7 +1316,7 @@ func TestReplicaCommandQueueInconsistent(t *testing.T) { go func() { args := putArgs(key, []byte{1}) - _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if pErr != nil { t.Fatal(pErr) @@ -1331,7 +1331,7 @@ func TestReplicaCommandQueueInconsistent(t *testing.T) { go func() { args := getArgs(key) - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.INCONSISTENT, }, &args) @@ -1376,14 +1376,14 @@ func TestReplicaUseTSCache(t *testing.T) { tc.manualClock.Set(t0.Nanoseconds()) args := getArgs([]byte("a")) - _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if pErr != nil { t.Error(pErr) } pArgs := putArgs([]byte("a"), []byte("value")) - _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{}, &pArgs) + _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{}, &pArgs) if pErr != nil { t.Fatal(pErr) } @@ -1405,7 +1405,7 @@ func TestReplicaNoTSCacheInconsistent(t *testing.T) { args := getArgs([]byte("a")) ts := tc.clock.Now() - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Timestamp: ts, ReadConsistency: roachpb.INCONSISTENT, }, &args) @@ -1415,7 +1415,7 @@ func TestReplicaNoTSCacheInconsistent(t *testing.T) { } pArgs := putArgs([]byte("a"), []byte("value")) - _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{Timestamp: roachpb.ZeroTimestamp.Add(0, 1)}, &pArgs) + _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Timestamp: roachpb.ZeroTimestamp.Add(0, 1)}, &pArgs) if pErr != nil { t.Fatal(pErr) } @@ -1441,7 +1441,7 @@ func TestReplicaNoTSCacheUpdateOnFailure(t *testing.T) { pArgs := putArgs(key, []byte("value")) txn := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock) - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, }, &pArgs) if pErr != nil { @@ -1452,7 +1452,7 @@ func TestReplicaNoTSCacheUpdateOnFailure(t *testing.T) { args := readOrWriteArgs(key, read) ts := tc.clock.Now() // later timestamp - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Timestamp: ts, }, args); pErr == nil { t.Errorf("test %d: expected failure", i) @@ -1460,7 +1460,7 @@ func TestReplicaNoTSCacheUpdateOnFailure(t *testing.T) { // Write the intent again -- should not have its timestamp upgraded! txn.Sequence++ - if _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { + if _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { t.Fatalf("test %d: %s", i, pErr) } else if !respH.Txn.Timestamp.Equal(txn.Timestamp) { t.Errorf("expected timestamp not to advance %s != %s", respH.Timestamp, txn.Timestamp) @@ -1484,7 +1484,7 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) { // Start with a read to warm the timestamp cache. gArgs := getArgs(key) - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, }, &gArgs); pErr != nil { t.Fatal(pErr) @@ -1494,7 +1494,7 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) { pArgs := putArgs(key, []byte("value")) txn.Sequence++ - _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn}, &pArgs) + _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn}, &pArgs) if pErr != nil { t.Fatal(pErr) } @@ -1509,7 +1509,7 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) { Status: roachpb.COMMITTED, } txn.Sequence++ - if _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn, Timestamp: txn.Timestamp}, rArgs); pErr != nil { + if _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn, Timestamp: txn.Timestamp}, rArgs); pErr != nil { t.Fatal(pErr) } @@ -1518,7 +1518,7 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) { expTS := ts expTS.Logical++ - _, respH, pErr = SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{Timestamp: ts}, &pArgs) + _, respH, pErr = SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Timestamp: ts}, &pArgs) if pErr != nil { t.Errorf("unexpected pError: %s", pErr) } @@ -1541,7 +1541,7 @@ func TestReplicaSequenceCacheReadError(t *testing.T) { args := incrementArgs(k, 1) txn.Sequence = 1 - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, }, &args); pErr != nil { t.Fatal(pErr) @@ -1559,7 +1559,7 @@ func TestReplicaSequenceCacheReadError(t *testing.T) { } // Now try increment again and verify error. - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, }, &args) if !testutils.IsPError(pErr, "replica corruption") { @@ -1582,7 +1582,7 @@ func TestReplicaSequenceCacheStoredTxnRetryError(t *testing.T) { _ = tc.rng.sequence.Put(tc.engine, nil, txn.ID, uint32(txn.Epoch), txn.Sequence, txn.Key, txn.Timestamp, roachpb.NewError(pastError)) args := incrementArgs(key, 1) - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, }, &args) if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { @@ -1596,7 +1596,7 @@ func TestReplicaSequenceCacheStoredTxnRetryError(t *testing.T) { txn.Sequence = 321 args := incrementArgs(key, 1) try := func() *roachpb.Error { - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, }, &args) return pErr @@ -1643,7 +1643,7 @@ func TestTransactionRetryLeavesIntents(t *testing.T) { // Read from the key to increment the timestamp cache. gArgs := getArgs(key) - if _, pErr := client.SendWrapped(tc.rng, tc.rng.context(), &gArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.rng, tc.rng.context(context.Background()), &gArgs); pErr != nil { t.Fatal(pErr) } @@ -1657,13 +1657,13 @@ func TestTransactionRetryLeavesIntents(t *testing.T) { ba.Add(&btArgs) ba.Add(&pArgs) ba.Add(&etArgs) - _, pErr := tc.Sender().Send(tc.rng.context(), ba) + _, pErr := tc.Sender().Send(tc.rng.context(context.Background()), ba) if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { t.Fatalf("expected retry error; got %s", pErr) } // Now verify that the intent was still written for key. - _, pErr = client.SendWrapped(tc.rng, tc.rng.context(), &gArgs) + _, pErr = client.SendWrapped(tc.rng, tc.rng.context(context.Background()), &gArgs) if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok { t.Fatalf("expected write intent error; got %s", pErr) } @@ -1687,7 +1687,7 @@ func TestReplicaSequenceCacheOnlyWithIntent(t *testing.T) { args, h := heartbeatArgs(txn) // If the sequence cache were active for this request, we'd catch a txn retry. // Instead, we expect the error from heartbeating a nonexistent txn. - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args); !testutils.IsPError(pErr, "record not present") { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args); !testutils.IsPError(pErr, "record not present") { t.Fatal(pErr) } } @@ -1707,7 +1707,7 @@ func TestEndTransactionDeadline(t *testing.T) { put := putArgs(key, key) _, header := beginTxnArgs(key, txn) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), header, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), header, &put); pErr != nil { t.Fatal(pErr) } txn.Writing = true @@ -1731,7 +1731,7 @@ func TestEndTransactionDeadline(t *testing.T) { { txn.Sequence++ - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), etHeader, &etArgs) + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), etHeader, &etArgs) switch i { case 0: // No deadline. @@ -1770,7 +1770,7 @@ func TestEndTransactionWithMalformedSplitTrigger(t *testing.T) { txn := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock) pArgs := putArgs(key, []byte("only here to make this a rw transaction")) txn.Sequence++ - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), roachpb.Header{ + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Txn: txn, }, &pArgs); pErr != nil { t.Fatal(pErr) @@ -1788,7 +1788,7 @@ func TestEndTransactionWithMalformedSplitTrigger(t *testing.T) { } txn.Sequence++ - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args); !testutils.IsPError(pErr, "range does not match splits") { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args); !testutils.IsPError(pErr, "range does not match splits") { t.Errorf("expected range does not match splits error; got %s", pErr) } } @@ -1810,13 +1810,13 @@ func TestEndTransactionBeforeHeartbeat(t *testing.T) { txn := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock) _, btH := beginTxnArgs(key, txn) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } txn.Sequence++ txn.Writing = true args, h := endTxnArgs(txn, commit) - resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args) + resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args) if pErr != nil { t.Error(pErr) } @@ -1835,7 +1835,7 @@ func TestEndTransactionBeforeHeartbeat(t *testing.T) { txn.Sequence++ hBA, h := heartbeatArgs(txn) - resp, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &hBA) + resp, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &hBA) if pErr != nil { t.Error(pErr) } @@ -1860,7 +1860,7 @@ func TestEndTransactionAfterHeartbeat(t *testing.T) { txn := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock) _, btH := beginTxnArgs(key, txn) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } @@ -1868,7 +1868,7 @@ func TestEndTransactionAfterHeartbeat(t *testing.T) { hBA, h := heartbeatArgs(txn) txn.Sequence++ - resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &hBA) + resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &hBA) if pErr != nil { t.Fatal(pErr) } @@ -1880,7 +1880,7 @@ func TestEndTransactionAfterHeartbeat(t *testing.T) { args, h := endTxnArgs(txn, commit) txn.Sequence++ - resp, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args) + resp, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args) if pErr != nil { t.Error(pErr) } @@ -1928,21 +1928,21 @@ func TestEndTransactionWithPushedTimestamp(t *testing.T) { pusher.Priority = 2 // pusher will win _, btH := beginTxnArgs(key, pushee) put := putArgs(key, []byte("value")) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } // Push pushee txn. pushTxn := pushTxnArgs(pusher, pushee, roachpb.PUSH_TIMESTAMP) pushTxn.Key = pusher.Key - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &pushTxn); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &pushTxn); pErr != nil { t.Error(pErr) } // End the transaction with args timestamp moved forward in time. endTxn, h := endTxnArgs(pushee, test.commit) pushee.Sequence++ - resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &endTxn) + resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &endTxn) if test.expErr { if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { @@ -1977,7 +1977,7 @@ func TestEndTransactionWithIncrementedEpoch(t *testing.T) { txn := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock) _, btH := beginTxnArgs(key, txn) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } txn.Writing = true @@ -1986,7 +1986,7 @@ func TestEndTransactionWithIncrementedEpoch(t *testing.T) { hBA, h := heartbeatArgs(txn) txn.Sequence++ - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &hBA) + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &hBA) if pErr != nil { t.Error(pErr) } @@ -1997,7 +1997,7 @@ func TestEndTransactionWithIncrementedEpoch(t *testing.T) { h.Txn.Priority = txn.Priority + 1 txn.Sequence++ - resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args) + resp, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args) if pErr != nil { t.Error(pErr) } @@ -2056,7 +2056,7 @@ func TestEndTransactionWithErrors(t *testing.T) { args, h := endTxnArgs(txn, true) txn.Sequence++ - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args); !testutils.IsPError(pErr, test.expErrRegexp) { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args); !testutils.IsPError(pErr, test.expErrRegexp) { t.Errorf("expected error:\n%s\nto match:\n%s", pErr, test.expErrRegexp) } } @@ -2082,7 +2082,7 @@ func TestRaftReplayProtection(t *testing.T) { { // Start with an increment for key. incArgs := incrementArgs(key, incs[0]) - _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{}, &incArgs) + _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{}, &incArgs) if pErr != nil { t.Fatal(pErr) } @@ -2091,7 +2091,7 @@ func TestRaftReplayProtection(t *testing.T) { // This will bump up to a higher timestamp than the original increment // and not surface a WriteTooOldError. h := roachpb.Header{Timestamp: respH.Timestamp.Prev()} - _, respH, pErr = SendWrapped(tc.Sender(), tc.rng.context(), h, &incArgs) + _, respH, pErr = SendWrapped(tc.Sender(), tc.rng.context(context.Background()), h, &incArgs) if pErr != nil { t.Fatalf("unexpected error: %s", respH) } @@ -2104,7 +2104,7 @@ func TestRaftReplayProtection(t *testing.T) { // encountered is an exact duplicate and nothing came before the // increment in the batch. h.Timestamp = respH.Timestamp - _, _, pErr = SendWrapped(tc.Sender(), tc.rng.context(), h, &incArgs) + _, _, pErr = SendWrapped(tc.Sender(), tc.rng.context(context.Background()), h, &incArgs) if _, ok := pErr.GetDetail().(*roachpb.WriteTooOldError); !ok { t.Fatalf("expected WriteTooOldError; got %s", pErr) } @@ -2117,7 +2117,7 @@ func TestRaftReplayProtection(t *testing.T) { incArgs := incrementArgs(key, inc) ba.Add(&incArgs) } - br, pErr := tc.Sender().Send(tc.rng.context(), ba) + br, pErr := tc.Sender().Send(tc.rng.context(context.Background()), ba) if pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -2129,7 +2129,7 @@ func TestRaftReplayProtection(t *testing.T) { // Now resend the batch with the same timestamp; this should look // like the replay it is and surface a WriteTooOldError. ba.Timestamp = br.Timestamp - _, pErr = tc.Sender().Send(tc.rng.context(), ba) + _, pErr = tc.Sender().Send(tc.rng.context(context.Background()), ba) if _, ok := pErr.GetDetail().(*roachpb.WriteTooOldError); !ok { t.Fatalf("expected WriteTooOldError; got %s", pErr) } @@ -2139,7 +2139,7 @@ func TestRaftReplayProtection(t *testing.T) { ba = roachpb.BatchRequest{} ba.Add(roachpb.NewDeleteRange(key, key.Next(), false)) ba.Add(&incArgs) - br, pErr = tc.Sender().Send(tc.rng.context(), ba) + br, pErr = tc.Sender().Send(tc.rng.context(context.Background()), ba) if pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -2147,7 +2147,7 @@ func TestRaftReplayProtection(t *testing.T) { // Send exact same batch; the DeleteRange should trip up and // we'll get a replay error. ba.Timestamp = br.Timestamp - _, pErr = tc.Sender().Send(tc.rng.context(), ba) + _, pErr = tc.Sender().Send(tc.rng.context(context.Background()), ba) if _, ok := pErr.GetDetail().(*roachpb.WriteTooOldError); !ok { t.Fatalf("expected WriteTooOldError; got %s", pErr) } @@ -2155,7 +2155,7 @@ func TestRaftReplayProtection(t *testing.T) { // Send just a DeleteRange batch. ba = roachpb.BatchRequest{} ba.Add(roachpb.NewDeleteRange(key, key.Next(), false)) - br, pErr = tc.Sender().Send(tc.rng.context(), ba) + br, pErr = tc.Sender().Send(tc.rng.context(context.Background()), ba) if pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -2164,7 +2164,7 @@ func TestRaftReplayProtection(t *testing.T) { // previous DeleteRange didn't leave any tombstones at this // timestamp for the replay to "trip" over. ba.Timestamp = br.Timestamp - _, pErr = tc.Sender().Send(tc.rng.context(), ba) + _, pErr = tc.Sender().Send(tc.rng.context(context.Background()), ba) if pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -2196,7 +2196,7 @@ func TestRaftReplayProtectionInTxn(t *testing.T) { ba.Add(&bt) ba.Add(&put) ba.Add(&et) - _, pErr := tc.Sender().Send(tc.rng.context(), ba) + _, pErr := tc.Sender().Send(tc.rng.context(context.Background()), ba) if pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -2205,7 +2205,7 @@ func TestRaftReplayProtectionInTxn(t *testing.T) { // Reach in and manually send to raft (to simulate Raft replay) and // also avoid updating the timestamp cache; verify WriteTooOldError. ba.Timestamp = txn.OrigTimestamp - pendingCmd, err := tc.rng.proposeRaftCommand(tc.rng.context(), ba) + pendingCmd, err := tc.rng.proposeRaftCommand(tc.rng.context(context.Background()), ba) if err != nil { t.Fatalf("%d: unexpected error: %s", i, err) } @@ -2238,7 +2238,7 @@ func TestReplayProtection(t *testing.T) { ba.Header = btH ba.Add(&bt) ba.Add(&put) - br, pErr := tc.Sender().Send(tc.rng.context(), ba) + br, pErr := tc.Sender().Send(tc.rng.context(context.Background()), ba) if pErr != nil { t.Fatalf("%d: unexpected error: %s", i, pErr) } @@ -2247,7 +2247,7 @@ func TestReplayProtection(t *testing.T) { putB := putArgs(keyB, []byte("value")) putTxn := br.Txn.Clone() putTxn.Sequence++ - _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: &putTxn}, &putB) + _, respH, pErr := SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: &putTxn}, &putB) if pErr != nil { t.Fatal(pErr) } @@ -2257,7 +2257,7 @@ func TestReplayProtection(t *testing.T) { etTxn.Sequence++ et, etH := endTxnArgs(&etTxn, true) et.IntentSpans = []roachpb.Span{{Key: key, EndKey: nil}, {Key: keyB, EndKey: nil}} - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), etH, &et); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), etH, &et); pErr != nil { t.Fatalf("%d: unexpected error: %s", i, pErr) } @@ -2270,33 +2270,33 @@ func TestReplayProtection(t *testing.T) { } // Now replay begin & put BeginTransaction should fail with a replay error. - _, pErr = tc.Sender().Send(tc.rng.context(), ba) + _, pErr = tc.Sender().Send(tc.rng.context(context.Background()), ba) if _, ok := pErr.GetDetail().(*roachpb.TransactionReplayError); !ok { t.Errorf("%d: expected transaction replay for iso=%s", i, iso) } // Intent should not have been created. gArgs := getArgs(key) - if _, pErr = client.SendWrapped(tc.rng, tc.rng.context(), &gArgs); pErr != nil { + if _, pErr = client.SendWrapped(tc.rng, tc.rng.context(context.Background()), &gArgs); pErr != nil { t.Errorf("%d: unexpected error reading key: %s", i, pErr) } // Send a put for keyB; should fail with a WriteTooOldError as this // will look like an obvious replay. - _, _, pErr = SendWrapped(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: &putTxn}, &putB) + _, _, pErr = SendWrapped(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: &putTxn}, &putB) if _, ok := pErr.GetDetail().(*roachpb.WriteTooOldError); !ok { t.Errorf("%d: expected write too old error for iso=%s; got %s", i, iso, pErr) } // EndTransaction should also fail, but with a status error (does not exist). - _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), etH, &et) + _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), etH, &et) if _, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); !ok { t.Errorf("%d: expected transaction aborted for iso=%s; got %s", i, iso, pErr) } // Expect that keyB intent did not get written! gArgs = getArgs(keyB) - if _, pErr = client.SendWrapped(tc.rng, tc.rng.context(), &gArgs); pErr != nil { + if _, pErr = client.SendWrapped(tc.rng, tc.rng.context(context.Background()), &gArgs); pErr != nil { t.Errorf("%d: unexpected error reading keyB: %s", i, pErr) } } @@ -2343,14 +2343,14 @@ func TestEndTransactionLocalGC(t *testing.T) { txn := newTransaction("test", key, 1, roachpb.SERIALIZABLE, tc.clock) _, btH := beginTxnArgs(key, txn) put := putArgs(putKey, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } putKey = putKey.Next() // for the next iteration args, h := endTxnArgs(txn, true) args.IntentSpans = test.intents txn.Sequence++ - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args); pErr != nil { t.Fatal(pErr) } var readTxn roachpb.Transaction @@ -2377,13 +2377,13 @@ func setupResolutionTest(t *testing.T, tc testContext, key roachpb.Key, splitKey txn.Epoch++ pArgs := putArgs(key, []byte("value")) h := roachpb.Header{Txn: txn} - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), h, &pArgs); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), h, &pArgs); pErr != nil { t.Fatal(pErr) } pArgs = putArgs(splitKey.AsRawKey(), []byte("value")) txn.Sequence++ - if _, pErr := client.SendWrappedWith(newRng, newRng.context(), h, &pArgs); pErr != nil { + if _, pErr := client.SendWrappedWith(newRng, newRng.context(context.Background()), h, &pArgs); pErr != nil { t.Fatal(pErr) } @@ -2391,7 +2391,7 @@ func setupResolutionTest(t *testing.T, tc testContext, key roachpb.Key, splitKey args, h := endTxnArgs(txn, true /* commit */) args.IntentSpans = []roachpb.Span{{Key: key, EndKey: splitKey.Next().AsRawKey()}} txn.Sequence++ - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &args); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &args); pErr != nil { t.Fatal(pErr) } return newRng, txn @@ -2421,14 +2421,14 @@ func TestEndTransactionResolveOnlyLocalIntents(t *testing.T) { // Check if the intent in the other range has not yet been resolved. gArgs := getArgs(splitKey) - _, pErr := client.SendWrapped(newRng, newRng.context(), &gArgs) + _, pErr := client.SendWrapped(newRng, newRng.context(context.Background()), &gArgs) if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok { t.Errorf("expected write intent error, but got %s", pErr) } txn.Sequence++ hbArgs, h := heartbeatArgs(txn) - reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &hbArgs) + reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &hbArgs) if pErr != nil { t.Fatal(pErr) } @@ -2759,7 +2759,7 @@ func TestPushTxnBadKey(t *testing.T) { args := pushTxnArgs(pusher, pushee, roachpb.PUSH_ABORT) args.Key = pusher.Key - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args); !testutils.IsPError(pErr, ".*should match pushee.*") { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args); !testutils.IsPError(pErr, ".*should match pushee.*") { t.Errorf("unexpected error %s", pErr) } } @@ -2788,19 +2788,19 @@ func TestPushTxnAlreadyCommittedOrAborted(t *testing.T) { // Begin the pushee's transaction. _, btH := beginTxnArgs(key, pushee) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } // End the pushee's transaction. etArgs, h := endTxnArgs(pushee, status == roachpb.COMMITTED) pushee.Sequence++ - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &etArgs); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &etArgs); pErr != nil { t.Fatal(pErr) } // Now try to push what's already committed or aborted. args := pushTxnArgs(pusher, pushee, roachpb.PUSH_ABORT) - resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if pErr != nil { t.Fatal(pErr) } @@ -2848,7 +2848,7 @@ func TestPushTxnUpgradeExistingTxn(t *testing.T) { pushee.LastHeartbeat = &test.startTS _, btH := beginTxnArgs(key, pushee) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } @@ -2856,7 +2856,7 @@ func TestPushTxnUpgradeExistingTxn(t *testing.T) { pushee.Timestamp = test.ts args := pushTxnArgs(pusher, pushee, roachpb.PUSH_ABORT) - resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if pErr != nil { t.Fatal(pErr) } @@ -2932,7 +2932,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { _, btH := beginTxnArgs(key, pushee) btH.Timestamp = tc.rng.store.Clock().Now() put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } @@ -2941,7 +2941,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { args.Now = roachpb.Timestamp{WallTime: test.currentTime} args.PushTo = args.Now - reply, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + reply, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if test.expSuccess != (pErr == nil) { t.Errorf("%d: expSuccess=%t; got pErr %s", i, test.expSuccess, pErr) @@ -3018,13 +3018,13 @@ func TestPushTxnPriorities(t *testing.T) { _, btH := beginTxnArgs(key, pushee) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } // Now, attempt to push the transaction with intent epoch set appropriately. args := pushTxnArgs(pusher, pushee, test.pushType) - _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if test.expSuccess != (pErr == nil) { t.Errorf("expected success on trial %d? %t; got err %s", i, test.expSuccess, pErr) @@ -3057,7 +3057,7 @@ func TestPushTxnPushTimestamp(t *testing.T) { key := roachpb.Key("a") _, btH := beginTxnArgs(key, pushee) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } pushee.Writing = true @@ -3065,7 +3065,7 @@ func TestPushTxnPushTimestamp(t *testing.T) { // Now, push the transaction with args.Abort=false. args := pushTxnArgs(pusher, pushee, roachpb.PUSH_TIMESTAMP) - resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if pErr != nil { t.Errorf("unexpected error on push: %s", pErr) } @@ -3100,14 +3100,14 @@ func TestPushTxnPushTimestampAlreadyPushed(t *testing.T) { key := roachpb.Key("a") _, btH := beginTxnArgs(key, pushee) put := putArgs(key, key) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } // Now, push the transaction with args.Abort=false. args := pushTxnArgs(pusher, pushee, roachpb.PUSH_TIMESTAMP) - resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if pErr != nil { t.Errorf("unexpected pError on push: %s", pErr) } @@ -3141,14 +3141,14 @@ func TestPushTxnSerializableRestart(t *testing.T) { // Read from the key to increment the timestamp cache. gArgs := getArgs(key) - if _, pErr := client.SendWrapped(tc.rng, tc.rng.context(), &gArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.rng, tc.rng.context(context.Background()), &gArgs); pErr != nil { t.Fatal(pErr) } // Begin the pushee's transaction & write to key. btArgs, btH := beginTxnArgs(key, pushee) put := putArgs(key, []byte("foo")) - resp, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put) + resp, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put) if pErr != nil { t.Fatal(pErr) } @@ -3157,7 +3157,7 @@ func TestPushTxnSerializableRestart(t *testing.T) { // Try to end the pushee's transaction; should get a retry failure. etArgs, h := endTxnArgs(pushee, true /* commit */) pushee.Sequence++ - _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), h, &etArgs) + _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), h, &etArgs) if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { t.Fatalf("expected retry error; got %s", pErr) } @@ -3167,7 +3167,7 @@ func TestPushTxnSerializableRestart(t *testing.T) { // Next push pushee to advance timestamp of txn record. pusher.Timestamp = tc.rng.store.Clock().Now() args := pushTxnArgs(pusher, &pusheeCopy, roachpb.PUSH_TIMESTAMP) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args); pErr != nil { t.Fatal(pErr) } @@ -3179,7 +3179,7 @@ func TestPushTxnSerializableRestart(t *testing.T) { ba.Add(&btArgs) ba.Add(&put) ba.Add(&etArgs) - _, pErr = tc.Sender().Send(tc.rng.context(), ba) + _, pErr = tc.Sender().Send(tc.rng.context(context.Background()), ba) if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { t.Fatalf("expected retry error; got %s", pErr) } @@ -3205,7 +3205,7 @@ func TestReplicaResolveIntentRange(t *testing.T) { for _, key := range keys { pArgs := putArgs(key, []byte("value1")) txn.Sequence++ - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { t.Fatal(pErr) } } @@ -3219,7 +3219,7 @@ func TestReplicaResolveIntentRange(t *testing.T) { IntentTxn: txn.TxnMeta, Status: roachpb.COMMITTED, } - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), rArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), rArgs); pErr != nil { t.Fatal(pErr) } @@ -3262,7 +3262,7 @@ func TestReplicaStatsComputation(t *testing.T) { // Put a value. pArgs := putArgs([]byte("a"), []byte("value1")) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &pArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &pArgs); pErr != nil { t.Fatal(pErr) } expMS := engine.MVCCStats{LiveBytes: 25, KeyBytes: 14, ValBytes: 11, IntentBytes: 0, LiveCount: 1, KeyCount: 1, ValCount: 1, IntentCount: 0, IntentAge: 0, GCBytesAge: 0, SysBytes: 81, SysCount: 2, LastUpdateNanos: 0} @@ -3280,7 +3280,7 @@ func TestReplicaStatsComputation(t *testing.T) { txn := newTransaction("test", pArgs.Key, 1, roachpb.SERIALIZABLE, tc.clock) txn.ID = uuid - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { t.Fatal(pErr) } expMS = engine.MVCCStats{LiveBytes: 95, KeyBytes: 28, ValBytes: 67, IntentBytes: 23, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 1, IntentAge: 0, GCBytesAge: 0, SysBytes: 145, SysCount: 3, LastUpdateNanos: 0} @@ -3295,7 +3295,7 @@ func TestReplicaStatsComputation(t *testing.T) { Status: roachpb.COMMITTED, } - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), rArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), rArgs); pErr != nil { t.Fatal(pErr) } expMS = engine.MVCCStats{LiveBytes: 50, KeyBytes: 28, ValBytes: 22, IntentBytes: 0, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 0, IntentAge: 0, GCBytesAge: 0, SysBytes: 81, SysCount: 2, LastUpdateNanos: 0} @@ -3304,7 +3304,7 @@ func TestReplicaStatsComputation(t *testing.T) { // Delete the 1st value. dArgs := deleteArgs([]byte("a")) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &dArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &dArgs); pErr != nil { t.Fatal(pErr) } expMS = engine.MVCCStats{LiveBytes: 25, KeyBytes: 40, ValBytes: 22, IntentBytes: 0, LiveCount: 1, KeyCount: 2, ValCount: 3, IntentCount: 0, IntentAge: 0, GCBytesAge: 0, SysBytes: 81, SysCount: 2, LastUpdateNanos: 0} @@ -3328,14 +3328,14 @@ func TestMerge(t *testing.T) { for _, str := range stringArgs { mergeArgs := internalMergeArgs(key, roachpb.MakeValueFromString(str)) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &mergeArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &mergeArgs); pErr != nil { t.Fatalf("unexpected error from Merge: %s", pErr) } } getArgs := getArgs(key) - reply, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &getArgs) + reply, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &getArgs) if pErr != nil { t.Fatalf("unexpected error from Get: %s", pErr) } @@ -3367,7 +3367,7 @@ func TestTruncateLog(t *testing.T) { for i := 0; i < 10; i++ { args := incrementArgs([]byte("a"), int64(i)) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args); pErr != nil { t.Fatal(pErr) } idx, err := tc.rng.GetLastIndex() @@ -3381,7 +3381,7 @@ func TestTruncateLog(t *testing.T) { // Discard the first half of the log. truncateArgs := truncateLogArgs(indexes[5], rangeID) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &truncateArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &truncateArgs); pErr != nil { t.Fatal(pErr) } @@ -3435,14 +3435,14 @@ func TestTruncateLog(t *testing.T) { // Truncating logs that have already been truncated should not return an // error. truncateArgs = truncateLogArgs(indexes[3], rangeID) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &truncateArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &truncateArgs); pErr != nil { t.Fatal(pErr) } // Truncating logs that have the wrong rangeID included should not return // an error but should not truncate any logs. truncateArgs = truncateLogArgs(indexes[9], rangeID+1) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &truncateArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &truncateArgs); pErr != nil { t.Fatal(pErr) } @@ -3470,7 +3470,7 @@ func TestConditionFailedError(t *testing.T) { value := []byte("quack") pArgs := putArgs(key, value) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &pArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &pArgs); pErr != nil { t.Fatal(pErr) } val := roachpb.MakeValueFromString("moo") @@ -3482,7 +3482,7 @@ func TestConditionFailedError(t *testing.T) { ExpValue: &val, } - _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Timestamp: roachpb.MinTimestamp}, &args) + _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Timestamp: roachpb.MinTimestamp}, &args) if cErr, ok := pErr.GetDetail().(*roachpb.ConditionFailedError); pErr == nil || !ok { t.Fatalf("expected ConditionFailedError, got %T with content %+v", @@ -3535,7 +3535,7 @@ func TestAppliedIndex(t *testing.T) { for i := int64(1); i <= 10; i++ { args := incrementArgs([]byte("a"), i) - resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + resp, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if pErr != nil { t.Fatal(pErr) } @@ -3576,13 +3576,13 @@ func TestReplicaCorruption(t *testing.T) { // First send a regular command. args := putArgs(roachpb.Key("test1"), []byte("value")) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args); pErr != nil { t.Fatal(pErr) } // maybeSetCorrupt should have been called. args = putArgs(roachpb.Key("boom"), []byte("value")) - _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args) + _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args) if !testutils.IsPError(pErr, "replica corruption \\(processed=true\\)") { t.Fatalf("unexpected error: %s", pErr) } @@ -3642,7 +3642,7 @@ func testRangeDanglingMetaIntent(t *testing.T, isReverse bool) { var rlReply *roachpb.RangeLookupResponse - reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.INCONSISTENT, }, rlArgs) if pErr != nil { @@ -3667,7 +3667,7 @@ func testRangeDanglingMetaIntent(t *testing.T, isReverse bool) { // priority). pArgs := putArgs(keys.RangeMetaKey(roachpb.RKey(key)), data) txn.Sequence++ - if _, pErr = maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { + if _, pErr = maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { t.Fatal(pErr) } @@ -3676,7 +3676,7 @@ func testRangeDanglingMetaIntent(t *testing.T, isReverse bool) { // Note that 'A' < 'a'. rlArgs.Key = keys.RangeMetaKey(roachpb.RKey{'A'}) - reply, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + reply, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ Timestamp: roachpb.MinTimestamp, ReadConsistency: roachpb.INCONSISTENT, }, rlArgs) @@ -3689,7 +3689,7 @@ func testRangeDanglingMetaIntent(t *testing.T, isReverse bool) { } // Switch to consistent lookups, which should run into the intent. - _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + _, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.CONSISTENT, }, rlArgs) if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok { @@ -3699,7 +3699,7 @@ func testRangeDanglingMetaIntent(t *testing.T, isReverse bool) { // Try 100 lookups with IgnoreIntents. Expect to see each descriptor at least once. // First, try this consistently, which should not be allowed. rlArgs.ConsiderIntents = true - _, pErr = client.SendWrapped(tc.Sender(), tc.rng.context(), rlArgs) + _, pErr = client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), rlArgs) if !testutils.IsPError(pErr, "can not read consistently and special-case intents") { t.Fatalf("wanted specific error, not %s", pErr) } @@ -3709,7 +3709,7 @@ func testRangeDanglingMetaIntent(t *testing.T, isReverse bool) { for !(origSeen && newSeen) { clonedRLArgs := *rlArgs - reply, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + reply, pErr = client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.INCONSISTENT, }, &clonedRLArgs) if pErr != nil { @@ -3774,7 +3774,7 @@ func TestReplicaLookupUseReverseScan(t *testing.T) { pArgs := putArgs(keys.RangeMetaKey(roachpb.RKey(r.EndKey)), data) txn.Sequence++ - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn}, &pArgs); pErr != nil { t.Fatal(pErr) } } @@ -3789,7 +3789,7 @@ func TestReplicaLookupUseReverseScan(t *testing.T) { IntentTxn: txn.TxnMeta, Status: roachpb.COMMITTED, } - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), rArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), rArgs); pErr != nil { t.Fatal(pErr) } @@ -3804,7 +3804,7 @@ func TestReplicaLookupUseReverseScan(t *testing.T) { for _, c := range testCases { clonedRLArgs := *rlArgs clonedRLArgs.Key = keys.RangeMetaKey(roachpb.RKey(c.key)) - reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.INCONSISTENT, }, &clonedRLArgs) if pErr != nil { @@ -3825,7 +3825,7 @@ func TestReplicaLookupUseReverseScan(t *testing.T) { } pArgs := putArgs(keys.RangeMetaKey(roachpb.RKey(intentRange.EndKey)), data) txn2 := newTransaction("test", roachpb.Key{}, 1, roachpb.SERIALIZABLE, tc.clock) - if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{Txn: txn2}, &pArgs); pErr != nil { + if _, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{Txn: txn2}, &pArgs); pErr != nil { t.Fatal(pErr) } @@ -3833,7 +3833,7 @@ func TestReplicaLookupUseReverseScan(t *testing.T) { for _, c := range testCases { clonedRLArgs := *rlArgs clonedRLArgs.Key = keys.RangeMetaKey(roachpb.RKey(c.key)) - reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{ + reply, pErr := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{ ReadConsistency: roachpb.INCONSISTENT, }, &clonedRLArgs) if pErr != nil { @@ -4025,7 +4025,7 @@ func TestBatchErrorWithIndex(t *testing.T) { Span: roachpb.Span{Key: roachpb.Key("k")}, }) - if _, pErr := tc.Sender().Send(tc.rng.context(), ba); pErr == nil { + if _, pErr := tc.Sender().Send(tc.rng.context(context.Background()), ba); pErr == nil { t.Fatal("expected an error") } else if pErr.Index == nil || pErr.Index.Index != 1 || !testutils.IsPError(pErr, "unexpected value") { t.Fatalf("invalid index or error type: %s", pErr) @@ -4051,7 +4051,7 @@ func TestReplicaLoadSystemConfigSpanIntent(t *testing.T) { _, btH := beginTxnArgs(key, newTransaction("test", key, 1, roachpb.SERIALIZABLE, rng.store.Clock())) btH.Txn.Priority = 1 // low so it can be pushed put := putArgs(key, []byte("foo")) - if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(), btH, &put); pErr != nil { + if _, pErr := maybeWrapWithBeginTransaction(tc.Sender(), tc.rng.context(context.Background()), btH, &put); pErr != nil { t.Fatal(pErr) } @@ -4061,7 +4061,7 @@ func TestReplicaLoadSystemConfigSpanIntent(t *testing.T) { pusher := newTransaction("test", key, 1, roachpb.SERIALIZABLE, rng.store.Clock()) pusher.Priority = 2 // will push successfully pushArgs := pushTxnArgs(pusher, btH.Txn, roachpb.PUSH_ABORT) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &pushArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &pushArgs); pErr != nil { t.Fatal(pErr) } @@ -4133,7 +4133,7 @@ func TestEntries(t *testing.T) { for i := 0; i < 10; i++ { args := incrementArgs([]byte("a"), int64(i)) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args); pErr != nil { t.Fatal(pErr) } idx, err := tc.rng.GetLastIndex() @@ -4148,7 +4148,7 @@ func TestEntries(t *testing.T) { // Discard the first half of the log. truncateArgs := truncateLogArgs(indexes[5], rangeID) - if _, pErr := client.SendWrapped(tc.Sender(), rng.context(), &truncateArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), rng.context(context.Background()), &truncateArgs); pErr != nil { t.Fatal(pErr) } @@ -4257,7 +4257,7 @@ func TestTerm(t *testing.T) { for i := 0; i < 10; i++ { args := incrementArgs([]byte("a"), int64(i)) - if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(), &args); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), tc.rng.context(context.Background()), &args); pErr != nil { t.Fatal(pErr) } idx, err := tc.rng.GetLastIndex() @@ -4269,7 +4269,7 @@ func TestTerm(t *testing.T) { // Discard the first half of the log. truncateArgs := truncateLogArgs(indexes[5], rangeID) - if _, pErr := client.SendWrapped(tc.Sender(), rng.context(), &truncateArgs); pErr != nil { + if _, pErr := client.SendWrapped(tc.Sender(), rng.context(context.Background()), &truncateArgs); pErr != nil { t.Fatal(pErr) } @@ -4345,10 +4345,10 @@ func TestGCIncorrectRange(t *testing.T) { ts2 := makeTS(2, 0) ts1Header := roachpb.Header{Timestamp: ts1} ts2Header := roachpb.Header{Timestamp: ts2} - if _, pErr := client.SendWrappedWith(rng2, rng2.context(), ts1Header, &putReq); pErr != nil { + if _, pErr := client.SendWrappedWith(rng2, rng2.context(context.Background()), ts1Header, &putReq); pErr != nil { t.Errorf("unexpected pError on put key request: %s", pErr) } - if _, pErr := client.SendWrappedWith(rng2, rng2.context(), ts2Header, &putReq); pErr != nil { + if _, pErr := client.SendWrappedWith(rng2, rng2.context(context.Background()), ts2Header, &putReq); pErr != nil { t.Errorf("unexpected pError on put key request: %s", pErr) } @@ -4357,13 +4357,13 @@ func TestGCIncorrectRange(t *testing.T) { // the request for the incorrect key will be silently dropped. gKey := gcKey(key, ts1) gcReq := gcArgs(rng1.Desc().StartKey, rng1.Desc().EndKey, gKey) - if _, pErr := client.SendWrapped(rng1, rng1.context(), &gcReq); pErr != nil { + if _, pErr := client.SendWrapped(rng1, rng1.context(context.Background()), &gcReq); pErr != nil { t.Errorf("unexpected pError on garbage collection request to incorrect range: %s", pErr) } // Make sure the key still exists on range 2. getReq := getArgs(key) - if res, pErr := client.SendWrappedWith(rng2, rng2.context(), ts1Header, &getReq); pErr != nil { + if res, pErr := client.SendWrappedWith(rng2, rng2.context(context.Background()), ts1Header, &getReq); pErr != nil { t.Errorf("unexpected pError on get request to correct range: %s", pErr) } else if resVal := res.(*roachpb.GetResponse).Value; resVal == nil { t.Errorf("expected value %s to exists after GC to incorrect range but before GC to correct range, found %v", val, resVal) @@ -4371,12 +4371,12 @@ func TestGCIncorrectRange(t *testing.T) { // Send GC request to range 2 for the same key. gcReq = gcArgs(rng2.Desc().StartKey, rng2.Desc().EndKey, gKey) - if _, pErr := client.SendWrapped(rng2, rng2.context(), &gcReq); pErr != nil { + if _, pErr := client.SendWrapped(rng2, rng2.context(context.Background()), &gcReq); pErr != nil { t.Errorf("unexpected pError on garbage collection request to correct range: %s", pErr) } // Make sure the key no longer exists on range 2. - if res, pErr := client.SendWrappedWith(rng2, rng2.context(), ts1Header, &getReq); pErr != nil { + if res, pErr := client.SendWrappedWith(rng2, rng2.context(context.Background()), ts1Header, &getReq); pErr != nil { t.Errorf("unexpected pError on get request to correct range: %s", pErr) } else if resVal := res.(*roachpb.GetResponse).Value; resVal != nil { t.Errorf("expected value at key %s to no longer exist after GC to correct range, found value %v", key, resVal) @@ -4476,14 +4476,14 @@ func TestComputeVerifyChecksum(t *testing.T) { rng := tc.rng incArgs := incrementArgs([]byte("a"), 23) - if _, err := client.SendWrapped(tc.Sender(), rng.context(), &incArgs); err != nil { + if _, err := client.SendWrapped(tc.Sender(), rng.context(context.Background()), &incArgs); err != nil { t.Fatal(err) } initialChecksum := verifyChecksum(t, rng) // Getting a value will not affect the snapshot checksum gArgs := getArgs(roachpb.Key("a")) - if _, err := client.SendWrapped(tc.Sender(), rng.context(), &gArgs); err != nil { + if _, err := client.SendWrapped(tc.Sender(), rng.context(context.Background()), &gArgs); err != nil { t.Fatal(err) } checksum := verifyChecksum(t, rng) @@ -4494,7 +4494,7 @@ func TestComputeVerifyChecksum(t *testing.T) { // Modifying the range will change the checksum. incArgs = incrementArgs([]byte("a"), 5) - if _, err := client.SendWrapped(tc.Sender(), rng.context(), &incArgs); err != nil { + if _, err := client.SendWrapped(tc.Sender(), rng.context(context.Background()), &incArgs); err != nil { t.Fatal(err) } checksum = verifyChecksum(t, rng) diff --git a/storage/split_queue.go b/storage/split_queue.go index 344392f747fd..737bfd35cdd9 100644 --- a/storage/split_queue.go +++ b/storage/split_queue.go @@ -19,6 +19,8 @@ package storage import ( "time" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" @@ -89,6 +91,7 @@ func (*splitQueue) shouldQueue(now roachpb.Timestamp, rng *Replica, // process synchronously invokes admin split for each proposed split key. func (sq *splitQueue) process(now roachpb.Timestamp, rng *Replica, sysCfg config.SystemConfig) error { + ctx := rng.context(context.TODO()) // First handle case of splitting due to zone config maps. desc := rng.Desc() @@ -111,7 +114,7 @@ func (sq *splitQueue) process(now roachpb.Timestamp, rng *Replica, // FIXME: why is this implementation not the same as the one above? if float64(rng.stats.GetSize())/float64(zone.RangeMaxBytes) > 1 { log.Infof("splitting %s size=%d max=%d", rng, rng.stats.GetSize(), zone.RangeMaxBytes) - if _, pErr := client.SendWrapped(rng, rng.context(), &roachpb.AdminSplitRequest{ + if _, pErr := client.SendWrapped(rng, ctx, &roachpb.AdminSplitRequest{ Span: roachpb.Span{Key: desc.StartKey.AsRawKey()}, }); pErr != nil { return pErr.GoError() diff --git a/storage/store.go b/storage/store.go index 6e79691809cf..bd1af8254a50 100644 --- a/storage/store.go +++ b/storage/store.go @@ -625,11 +625,11 @@ func (s *Store) String() string { return fmt.Sprintf("store=%d:%d (%s)", s.Ident.NodeID, s.Ident.StoreID, s.engine) } -// Context returns a base context to pass along with commands being executed, -// derived from the supplied context (which is allowed to be nil). -func (s *Store) Context(ctx context.Context) context.Context { +// context returns a base context to pass along with commands being executed, +// derived from the supplied context (which is not allowed to be nil). +func (s *Store) context(ctx context.Context) context.Context { if ctx == nil { - ctx = context.Background() + panic("ctx cannot be nil") } return log.Add(ctx, log.NodeID, s.Ident.NodeID, @@ -836,7 +836,7 @@ func (s *Store) WaitForInit() { // whether the store has a first range or config replica and asks those ranges // to gossip accordingly. func (s *Store) startGossip() { - ctx := s.Context(nil) + ctx := s.context(context.TODO()) // Periodic updates run in a goroutine and signal a WaitGroup upon completion // of their first iteration. s.initComplete.Add(2) @@ -921,7 +921,7 @@ func (s *Store) maybeGossipSystemConfig() error { // gossip. If an unexpected error occurs (i.e. nobody else seems to // have an active lease but we still failed to obtain it), return // that error. - _, pErr := rng.getLeaseForGossip(s.Context(nil)) + _, pErr := rng.getLeaseForGossip(s.context(context.TODO())) return pErr.GoError() } @@ -941,7 +941,7 @@ func (s *Store) systemGossipUpdate(cfg config.SystemConfig) { // GossipStore broadcasts the store on the gossip network. func (s *Store) GossipStore() { - ctx := s.Context(nil) + ctx := s.context(context.TODO()) storeDesc, err := s.Descriptor() if err != nil { @@ -1501,7 +1501,7 @@ func (s *Store) ReplicaCount() int { // timestamp (for instance due to the timestamp cache), the response will have // a transaction set which should be used to update the client transaction. func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - ctx = s.Context(ctx) + ctx = s.context(ctx) for _, union := range ba.Requests { arg := union.GetInner() diff --git a/storage/stores.go b/storage/stores.go index 7dedd5629818..ddf3cc0601c2 100644 --- a/storage/stores.go +++ b/storage/stores.go @@ -246,7 +246,7 @@ func (ls *Stores) RangeLookup( ConsiderIntents: considerIntents, Reverse: useReverseScan, }) - br, pErr := ls.Send(context.Background(), ba) + br, pErr := ls.Send(context.TODO(), ba) if pErr != nil { return nil, pErr }