Skip to content

Commit

Permalink
storage: Link Node.Batch context correctly to timeout cmds
Browse files Browse the repository at this point in the history
In cockroachdb#5368, I was seeing that a call to `redirectOnOrAcquireLeaderLease`
had been stuck for 541 minutes. This function selects on a leader lease
channel, but also selects on a context cancellation. This means that the
context should have timed out. It looks like we had dropped the original
context with timeout in `Node.Batch`, which came from `kv.sendOne`. This
change should properly link these two contexts together so that the
timeout in the stuck command would work correctly.
  • Loading branch information
nvanbenschoten committed Mar 25, 2016
1 parent 888b548 commit 4b0baf5
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 191 deletions.
20 changes: 13 additions & 7 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,13 @@ func NewNode(ctx storage.StoreContext, recorder *status.MetricsRecorder, stopper
return n
}

// context returns a context encapsulating the NodeID.
func (n *Node) context() context.Context {
return log.Add(context.Background(), log.NodeID, n.Descriptor.NodeID)
// context returns a context encapsulating the NodeID, derived from the
// supplied context (which is not allowed to be nil).
func (n *Node) context(ctx context.Context) context.Context {
if ctx == nil {
panic("ctx cannot be nil")
}
return log.Add(ctx, log.NodeID, n.Descriptor.NodeID)
}

// initDescriptor initializes the node descriptor with the server
Expand Down Expand Up @@ -330,7 +334,7 @@ func (n *Node) start(addr net.Addr, engines []engine.Engine, attrs roachpb.Attri
// Record node started event.
n.recordJoinEvent()

log.Infoc(n.context(), "Started node with %v engine(s) and attributes %v", engines, attrs.Attrs)
log.Infoc(n.context(context.TODO()), "Started node with %v engine(s) and attributes %v", engines, attrs.Attrs)
return nil
}

Expand Down Expand Up @@ -703,6 +707,8 @@ func (n *Node) Batch(ctx context.Context, args *roachpb.BatchRequest) (*roachpb.
// back with the response. This is more expensive, but then again,
// those are individual requests traced by users, so they can be.
if sp.BaggageItem(tracing.Snowball) != "" {
sp.LogEvent("delegating to snowball tracing")
sp.Finish()
if sp, err = tracing.JoinOrNewSnowball(opName, args.Trace, func(rawSpan basictracer.RawSpan) {
encSp, err := tracing.EncodeRawSpan(&rawSpan, nil)
if err != nil {
Expand All @@ -715,14 +721,14 @@ func (n *Node) Batch(ctx context.Context, args *roachpb.BatchRequest) (*roachpb.
}
}
defer sp.Finish()
ctx := opentracing.ContextWithSpan((*Node)(n).context(), sp)
traceCtx := opentracing.ContextWithSpan(n.context(ctx), sp)

tStart := timeutil.Now()
var pErr *roachpb.Error
br, pErr = n.stores.Send(ctx, *args)
br, pErr = n.stores.Send(traceCtx, *args)
if pErr != nil {
br = &roachpb.BatchResponse{}
log.Trace(ctx, fmt.Sprintf("error: %T", pErr.GetDetail()))
log.Trace(traceCtx, fmt.Sprintf("error: %T", pErr.GetDetail()))
}
if br.Error != nil {
panic(roachpb.ErrorUnexpectedlySet(n.stores, br))
Expand Down
7 changes: 5 additions & 2 deletions storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/gogo/protobuf/proto"

"github.com/cockroachdb/cockroach/client"
Expand Down Expand Up @@ -330,6 +332,7 @@ func processSequenceCache(
// 8) send a GCRequest.
func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica,
sysCfg config.SystemConfig) error {
ctx := repl.context(context.TODO())

snap := repl.store.Engine().NewSnapshot()
desc := repl.Desc()
Expand All @@ -346,7 +349,7 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica,
pushTxn(repl, now, txn, typ)
},
func(intents []roachpb.Intent, poison bool, wait bool) error {
return repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, poison, wait)
return repl.store.intentResolver.resolveIntents(ctx, repl, intents, poison, wait)
})

if err != nil {
Expand All @@ -369,7 +372,7 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica,
ba.RangeID = desc.RangeID
ba.Timestamp = now
ba.Add(&gcArgs)
if _, pErr := repl.Send(repl.context(), ba); pErr != nil {
if _, pErr := repl.Send(ctx, ba); pErr != nil {
return pErr.GoError()
}
return nil
Expand Down
8 changes: 5 additions & 3 deletions storage/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
Expand Down Expand Up @@ -220,7 +222,7 @@ func TestGCQueueProcess(t *testing.T) {
txn.OrigTimestamp = datum.ts
txn.Timestamp = datum.ts
}
if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{
if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{
Timestamp: datum.ts,
Txn: txn,
}, &dArgs); err != nil {
Expand All @@ -234,7 +236,7 @@ func TestGCQueueProcess(t *testing.T) {
txn.OrigTimestamp = datum.ts
txn.Timestamp = datum.ts
}
if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{
if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{
Timestamp: datum.ts,
Txn: txn,
}, &pArgs); err != nil {
Expand Down Expand Up @@ -477,7 +479,7 @@ func TestGCQueueIntentResolution(t *testing.T) {
// TODO(spencerkimball): benchmark with ~50k.
for j := 0; j < 5; j++ {
pArgs := putArgs(roachpb.Key(fmt.Sprintf("%d-%05d", i, j)), []byte("value"))
if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(), roachpb.Header{
if _, err := client.SendWrappedWith(tc.Sender(), tc.rng.context(context.Background()), roachpb.Header{
Txn: txns[i],
}, &pArgs); err != nil {
t.Fatalf("%d: could not put data: %s", i, err)
Expand Down
2 changes: 1 addition & 1 deletion storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA
return
}
now := r.store.Clock().Now()
ctx := r.context()
ctx := r.context(context.TODO())
stopper := r.store.Stopper()

for _, item := range intents {
Expand Down
4 changes: 3 additions & 1 deletion storage/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package storage
import (
"testing"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/testutils"
"github.com/cockroachdb/cockroach/util/leaktest"
Expand All @@ -36,7 +38,7 @@ func TestPushTransactionsWithNonPendingIntent(t *testing.T) {

intents := []roachpb.Intent{{Span: roachpb.Span{Key: roachpb.Key("a")}, Status: roachpb.ABORTED}}
if _, pErr := tc.store.intentResolver.maybePushTransactions(
tc.rng.context(), intents, roachpb.Header{}, roachpb.PUSH_TOUCH, true); !testutils.IsPError(pErr, "unexpected aborted/resolved intent") {
tc.rng.context(context.Background()), intents, roachpb.Header{}, roachpb.PUSH_TOUCH, true); !testutils.IsPError(pErr, "unexpected aborted/resolved intent") {
t.Errorf("expected error on aborted/resolved intent, but got %s", pErr)
}
}
4 changes: 3 additions & 1 deletion storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"
"golang.org/x/net/trace"

"github.com/cockroachdb/cockroach/config"
Expand Down Expand Up @@ -115,6 +116,7 @@ type queueImpl interface {

// process accepts current time, a replica, and the system config
// and executes queue-specific work on it.
// TODO(nvanbenschoten) this should provide a context.Context.
process(roachpb.Timestamp, *Replica, config.SystemConfig) error

// timer returns a duration to wait between processing the next item
Expand Down Expand Up @@ -414,7 +416,7 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error {
// and renew or acquire if necessary.
if bq.impl.needsLeaderLease() {
sp := repl.store.Tracer().StartSpan(bq.name)
ctx := opentracing.ContextWithSpan(repl.context(), sp)
ctx := opentracing.ContextWithSpan(repl.context(context.Background()), sp)
defer sp.Finish()
// Create a "fake" get request in order to invoke redirectOnOrAcquireLease.
if err := repl.redirectOnOrAcquireLeaderLease(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (t *RaftTransport) processQueue(nodeID roachpb.NodeID) {
return
}
client := NewMultiRaftClient(conn)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
if log.V(1) {
log.Infof("establishing Raft transport stream to node %d at %s", nodeID, addr)
Expand Down Expand Up @@ -227,7 +227,7 @@ func (t *RaftTransport) processQueue(nodeID roachpb.NodeID) {
return
case req := <-ch:
if req.Message.Type == raftpb.MsgSnap {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
snapStream, err := client.RaftMessage(ctx)
if err != nil {
Expand Down
28 changes: 14 additions & 14 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error {
return nil
}

// context returns a context which is initialized with information about
// this range. It is only relevant when commands need to be executed
// on this range in the absence of a pre-existing context, such as
// during range scanner operations.
func (r *Replica) context() context.Context {
return context.WithValue(r.store.Context(nil), log.RangeID, r.RangeID)
// context returns a context with information about this range, derived from
// the supplied context (which is not allowed to be nil). It is only relevant
// when commands need to be executed on this range in the absence of a
// pre-existing context, such as during range scanner operations.
func (r *Replica) context(ctx context.Context) context.Context {
return context.WithValue(r.store.context(ctx), log.RangeID, r.RangeID)
}

// GetMaxBytes atomically gets the range maximum byte limit.
Expand Down Expand Up @@ -479,7 +479,7 @@ func (r *Replica) requestLeaderLease(timestamp roachpb.Timestamp) <-chan *roachp
// checks from normal request machinery, (e.g. the command queue).
// Note that the command itself isn't traced, but usually the caller
// waiting for the result has an active Trace.
cmd, err := r.proposeRaftCommand(r.context(), ba)
cmd, err := r.proposeRaftCommand(r.context(context.Background()), ba)
if err != nil {
return roachpb.NewError(err)
}
Expand Down Expand Up @@ -1399,7 +1399,7 @@ func (r *Replica) sendRaftMessage(msg raftpb.Message) {
// the command's done channel, if available.
func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, raftCmd roachpb.RaftCommand) *roachpb.Error {
if index == 0 {
log.Fatalc(r.context(), "processRaftCommand requires a non-zero index")
log.Fatalc(r.context(context.TODO()), "processRaftCommand requires a non-zero index")
}

r.mu.Lock()
Expand All @@ -1413,7 +1413,7 @@ func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, r
ctx = cmd.ctx
} else {
// TODO(tschottdorf): consider the Trace situation here.
ctx = r.context()
ctx = r.context(context.Background())
}

log.Trace(ctx, "applying batch")
Expand All @@ -1426,7 +1426,7 @@ func (r *Replica) processRaftCommand(idKey storagebase.CmdIDKey, index uint64, r
if cmd != nil {
cmd.done <- roachpb.ResponseWithError{Reply: br, Err: err}
} else if err != nil && log.V(1) {
log.Errorc(r.context(), "error executing raft command: %s", err)
log.Errorc(r.context(context.TODO()), "error executing raft command: %s", err)
}

return err
Expand Down Expand Up @@ -1815,7 +1815,7 @@ func (r *Replica) maybeGossipFirstRange() *roachpb.Error {
return nil
}

ctx := r.context()
ctx := r.context(context.TODO())

// When multiple nodes are initialized with overlapping Gossip addresses, they all
// will attempt to gossip their cluster ID. This is a fairly obvious misconfiguration,
Expand Down Expand Up @@ -1879,7 +1879,7 @@ func (r *Replica) maybeGossipSystemConfig() {
return
}

ctx := r.context()
ctx := r.context(context.TODO())
// TODO(marc): check for bad split in the middle of the SystemConfig span.
kvs, hash, err := r.loadSystemConfigSpan()
if err != nil {
Expand Down Expand Up @@ -1928,7 +1928,7 @@ func newReplicaCorruptionError(errs ...error) *roachpb.ReplicaCorruptionError {
// range, store, node or cluster with corresponding actions taken.
func (r *Replica) maybeSetCorrupt(pErr *roachpb.Error) *roachpb.Error {
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
log.Errorc(r.context(), "stalling replica due to: %s", cErr.ErrorMsg)
log.Errorc(r.context(context.TODO()), "stalling replica due to: %s", cErr.ErrorMsg)
cErr.Processed = true
return roachpb.NewError(cErr)
}
Expand All @@ -1946,7 +1946,7 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) {
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.ScanRequest{Span: keys.SystemConfigSpan})
br, intents, pErr :=
r.executeBatch(r.context(), storagebase.CmdIDKey(""), r.store.Engine(), nil, ba)
r.executeBatch(r.context(context.TODO()), storagebase.CmdIDKey(""), r.store.Engine(), nil, ba)
if pErr != nil {
return nil, nil, pErr.GoError()
}
Expand Down
11 changes: 6 additions & 5 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *Replica) executeCmd(ctx context.Context, raftCmdID storagebase.CmdIDKey
reply = &resp
case *roachpb.EndTransactionRequest:
var resp roachpb.EndTransactionResponse
resp, intents, err = r.EndTransaction(batch, ms, h, *tArgs)
resp, intents, err = r.EndTransaction(ctx, batch, ms, h, *tArgs)
reply = &resp
case *roachpb.RangeLookupRequest:
var resp roachpb.RangeLookupResponse
Expand Down Expand Up @@ -370,7 +370,7 @@ func (r *Replica) BeginTransaction(
// TODO(tschottdorf): return nil reply on any error. The error itself
// must be the authoritative source of information.
func (r *Replica) EndTransaction(
batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.EndTransactionRequest,
ctx context.Context, batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.EndTransactionRequest,
) (roachpb.EndTransactionResponse, []roachpb.Intent, error) {
var reply roachpb.EndTransactionResponse

Expand Down Expand Up @@ -558,7 +558,7 @@ func (r *Replica) EndTransaction(

if err := func() error {
if ct.GetSplitTrigger() != nil {
if err := r.splitTrigger(r.context(), batch, ms, ct.SplitTrigger, reply.Txn.Timestamp); err != nil {
if err := r.splitTrigger(r.context(ctx), batch, ms, ct.SplitTrigger, reply.Txn.Timestamp); err != nil {
return err
}
*ms = engine.MVCCStats{} // clear stats, as split recomputed.
Expand Down Expand Up @@ -1379,6 +1379,7 @@ func (r *Replica) LeaderLease(
func (r *Replica) CheckConsistency(
args roachpb.CheckConsistencyRequest, desc *roachpb.RangeDescriptor,
) (roachpb.CheckConsistencyResponse, *roachpb.Error) {
ctx := r.context(context.TODO())
key := desc.StartKey.AsRawKey()
endKey := desc.EndKey.AsRawKey()
id := uuid.MakeV4()
Expand All @@ -1396,7 +1397,7 @@ func (r *Replica) CheckConsistency(
ChecksumID: id,
}
ba.Add(checkArgs)
_, pErr := r.Send(r.context(), ba)
_, pErr := r.Send(ctx, ba)
if pErr != nil {
return roachpb.CheckConsistencyResponse{}, pErr
}
Expand Down Expand Up @@ -1427,7 +1428,7 @@ func (r *Replica) CheckConsistency(
Checksum: c.checksum,
}
ba.Add(checkArgs)
_, pErr := r.Send(r.context(), ba)
_, pErr := r.Send(ctx, ba)
if pErr != nil {
return roachpb.CheckConsistencyResponse{}, pErr
}
Expand Down
Loading

0 comments on commit 4b0baf5

Please sign in to comment.