Skip to content

Commit

Permalink
Merge pull request cockroachdb#7566 from BramGruneir/trace
Browse files Browse the repository at this point in the history
storage: plumb a tracing context into all queues' process()
  • Loading branch information
BramGruneir authored Jul 6, 2016
2 parents 4466300 + c7a77cd commit 7a07990
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 117 deletions.
13 changes: 7 additions & 6 deletions storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/base"
"github.com/cockroachdb/cockroach/config"
Expand Down Expand Up @@ -286,10 +285,12 @@ func processAbortCache(
// 6) scan the abort cache table for old entries
// 7) push these transactions (again, recreating txn entries).
// 8) send a GCRequest.
func (gcq *gcQueue) process(now hlc.Timestamp, repl *Replica,
sysCfg config.SystemConfig) error {
ctx := repl.context(context.TODO())

func (gcq *gcQueue) process(
ctx context.Context,
now hlc.Timestamp,
repl *Replica,
sysCfg config.SystemConfig,
) error {
snap := repl.store.Engine().NewSnapshot()
desc := repl.Desc()
defer snap.Close()
Expand Down
10 changes: 5 additions & 5 deletions storage/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/keys"
Expand All @@ -35,8 +37,6 @@ import (
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/uuid"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)

// makeTS creates a new hybrid logical timestamp.
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestGCQueueProcess(t *testing.T) {

// Process through a scan queue.
gcQ := newGCQueue(tc.gossip)
if err := gcQ.process(tc.clock.Now(), tc.rng, cfg); err != nil {
if err := gcQ.process(context.Background(), tc.clock.Now(), tc.rng, cfg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -494,7 +494,7 @@ func TestGCQueueTransactionTable(t *testing.T) {
t.Fatal("config not set")
}

if err := gcQ.process(tc.clock.Now(), tc.rng, cfg); err != nil {
if err := gcQ.process(context.Background(), tc.clock.Now(), tc.rng, cfg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestGCQueueIntentResolution(t *testing.T) {

// Process through a scan queue.
gcQ := newGCQueue(tc.gossip)
if err := gcQ.process(tc.clock.Now(), tc.rng, cfg); err != nil {
if err := gcQ.process(context.Background(), tc.clock.Now(), tc.rng, cfg); err != nil {
t.Fatal(err)
}

Expand Down
26 changes: 17 additions & 9 deletions storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/net/trace"

Expand All @@ -33,8 +35,6 @@ import (
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/timeutil"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -112,8 +112,7 @@ type queueImpl interface {

// process accepts current time, a replica, and the system config
// and executes queue-specific work on it.
// TODO(nvanbenschoten) this should take a context.Context.
process(hlc.Timestamp, *Replica, config.SystemConfig) error
process(context.Context, hlc.Timestamp, *Replica, config.SystemConfig) error

// timer returns a duration to wait between processing the next item
// from the queue.
Expand Down Expand Up @@ -204,7 +203,12 @@ type baseQueue struct {
// maxSize doesn't prevent new replicas from being added, it just
// limits the total size. Higher priority replicas can still be
// added; their addition simply removes the lowest priority replica.
func makeBaseQueue(name string, impl queueImpl, gossip *gossip.Gossip, cfg queueConfig) baseQueue {
func makeBaseQueue(
name string,
impl queueImpl,
gossip *gossip.Gossip,
cfg queueConfig,
) baseQueue {
bq := baseQueue{
name: name,
impl: impl,
Expand Down Expand Up @@ -429,13 +433,15 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error {
return nil
}

sp := repl.store.Tracer().StartSpan(bq.name)
ctx := opentracing.ContextWithSpan(repl.context(context.Background()), sp)
log.Trace(ctx, fmt.Sprintf("queue start for range %d", repl.RangeID))
defer sp.Finish()

// If the queue requires a replica to have the range leader lease in
// order to be processed, check whether this replica has leader lease
// and renew or acquire if necessary.
if bq.needsLeaderLease {
sp := repl.store.Tracer().StartSpan(bq.name)
ctx := opentracing.ContextWithSpan(repl.context(context.Background()), sp)
defer sp.Finish()
// Create a "fake" get request in order to invoke redirectOnOrAcquireLease.
if err := repl.redirectOnOrAcquireLeaderLease(ctx); err != nil {
if _, harmless := err.GetDetail().(*roachpb.NotLeaderError); harmless {
Expand All @@ -444,14 +450,16 @@ func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error {
}
return errors.Wrapf(err.GoError(), "%s: could not obtain lease", repl)
}
log.Trace(ctx, "got range lease")
}

bq.eventLog.VInfof(log.V(3), "%s: processing", repl)
start := timeutil.Now()
if err := bq.impl.process(clock.Now(), repl, cfg); err != nil {
if err := bq.impl.process(ctx, clock.Now(), repl, cfg); err != nil {
return err
}
bq.eventLog.VInfof(log.V(2), "%s: done: %s", repl, timeutil.Since(start))
log.Trace(ctx, "done")
return nil
}

Expand Down
Loading

0 comments on commit 7a07990

Please sign in to comment.