Skip to content

Commit

Permalink
Merge pull request cockroachdb#10549 from tamird/storage-queue-logging
Browse files Browse the repository at this point in the history
storage: annotate contexts instead of manually logging
  • Loading branch information
tamird authored Nov 8, 2016
2 parents 0ebe696 + ab52993 commit 8f278e6
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (bq *baseQueue) Start(clock *hlc.Clock, stopper *stop.Stopper) {
func (bq *baseQueue) Add(repl *Replica, priority float64) (bool, error) {
bq.mu.Lock()
defer bq.mu.Unlock()
ctx := bq.AnnotateCtx(context.TODO())
ctx := repl.AnnotateCtx(bq.AnnotateCtx(context.TODO()))
return bq.addInternal(ctx, repl.Desc(), true, priority)
}

Expand All @@ -306,7 +306,7 @@ func (bq *baseQueue) MaybeAdd(repl *Replica, now hlc.Timestamp) {
return
}

ctx := bq.AnnotateCtx(context.TODO())
ctx := repl.AnnotateCtx(bq.AnnotateCtx(context.TODO()))

if !cfgOk {
log.VEvent(ctx, 1, "no system config available. skipping")
Expand All @@ -316,7 +316,7 @@ func (bq *baseQueue) MaybeAdd(repl *Replica, now hlc.Timestamp) {
if requiresSplit {
// Range needs to be split due to zone configs, but queue does
// not accept unsplit ranges.
log.VEventf(ctx, 1, "%s: split needed; not adding", repl)
log.VEventf(ctx, 1, "split needed; not adding")
return
}

Expand All @@ -325,14 +325,14 @@ func (bq *baseQueue) MaybeAdd(repl *Replica, now hlc.Timestamp) {
// holder is.
if lease, _ := repl.getLease(); lease != nil &&
lease.Covers(repl.store.Clock().Now()) && !lease.OwnedBy(repl.store.StoreID()) {
log.VEventf(ctx, 1, "%s: needs lease; not adding: %+v", repl, lease)
log.VEventf(ctx, 1, "needs lease; not adding: %+v", lease)
return
}
}

should, priority := bq.impl.shouldQueue(ctx, now, repl, cfg)
if _, err := bq.addInternal(ctx, repl.Desc(), should, priority); !isExpectedQueueError(err) {
log.Errorf(ctx, "unable to add %s: %s", repl, err)
log.Errorf(ctx, "unable to add: %s", err)
}
}

Expand Down Expand Up @@ -385,15 +385,14 @@ func (bq *baseQueue) addInternal(
return false, errReplicaNotAddable
} else if ok {
if item.priority != priority {
log.Eventf(ctx, "%s: updating priority: %0.3f -> %0.3f",
desc, item.priority, priority)
log.Eventf(ctx, "updating priority: %0.3f -> %0.3f", item.priority, priority)
}
// Replica has already been added; update priority.
bq.mu.priorityQ.update(item, priority)
return false, nil
}

log.VEventf(ctx, 3, "%s: adding: priority=%0.3f", desc, priority)
log.VEventf(ctx, 3, "adding: priority=%0.3f", priority)
item = &replicaItem{value: desc.RangeID, priority: priority}
bq.add(item)

Expand Down Expand Up @@ -470,9 +469,10 @@ func (bq *baseQueue) processLoop(clock *hlc.Clock, stopper *stop.Stopper) {
repl := bq.pop()
if repl != nil {
if stopper.RunTask(func() {
if err := bq.processReplica(ctx, repl, clock); err != nil {
annotatedCtx := repl.AnnotateCtx(ctx)
if err := bq.processReplica(annotatedCtx, repl, clock); err != nil {
// Maybe add failing replica to purgatory if the queue supports it.
bq.maybeAddToPurgatory(ctx, repl, err, clock, stopper)
bq.maybeAddToPurgatory(annotatedCtx, repl, err, clock, stopper)
}
}) != nil {
return
Expand Down Expand Up @@ -507,7 +507,7 @@ func (bq *baseQueue) processReplica(
if bq.requiresSplit(cfg, repl) {
// Range needs to be split due to zone configs, but queue does
// not accept unsplit ranges.
log.VEventf(queueCtx, 3, "%s: split needed; skipping", repl)
log.VEventf(queueCtx, 3, "split needed; skipping")
return nil
}

Expand Down Expand Up @@ -565,7 +565,7 @@ func (bq *baseQueue) maybeAddToPurgatory(

// Check whether the failure is a purgatory error and whether the queue supports it.
if _, ok := triggeringErr.(purgatoryError); !ok || bq.impl.purgatoryChan() == nil {
log.Errorf(ctx, "on %s: %s", repl, triggeringErr)
log.Error(ctx, triggeringErr)
return
}
bq.mu.Lock()
Expand All @@ -576,7 +576,7 @@ func (bq *baseQueue) maybeAddToPurgatory(
return
}

log.Errorf(ctx, "(purgatory) on %s: %s", repl, triggeringErr)
log.Error(ctx, errors.Wrap(triggeringErr, "purgatory"))

item := &replicaItem{value: repl.RangeID}
bq.mu.replicas[repl.RangeID] = item
Expand Down Expand Up @@ -618,8 +618,9 @@ func (bq *baseQueue) maybeAddToPurgatory(
return
}
if stopper.RunTask(func() {
if err := bq.processReplica(ctx, repl, clock); err != nil {
bq.maybeAddToPurgatory(ctx, repl, err, clock, stopper)
annotatedCtx := repl.AnnotateCtx(ctx)
if err := bq.processReplica(annotatedCtx, repl, clock); err != nil {
bq.maybeAddToPurgatory(annotatedCtx, repl, err, clock, stopper)
}
}) != nil {
return
Expand Down Expand Up @@ -696,12 +697,11 @@ func (bq *baseQueue) remove(item *replicaItem) {
// Exposed for testing only.
func (bq *baseQueue) DrainQueue(clock *hlc.Clock) {
ctx := bq.AnnotateCtx(context.TODO())
repl := bq.pop()
for repl != nil {
if err := bq.processReplica(ctx, repl, clock); err != nil {
for repl := bq.pop(); repl != nil; repl = bq.pop() {
annotatedCtx := repl.AnnotateCtx(ctx)
if err := bq.processReplica(annotatedCtx, repl, clock); err != nil {
bq.failures.Inc(1)
log.Errorf(ctx, "failed processing replica %s: %s", repl, err)
log.Error(annotatedCtx, err)
}
repl = bq.pop()
}
}

0 comments on commit 8f278e6

Please sign in to comment.