Skip to content

Commit

Permalink
storage: PR feedback
Browse files Browse the repository at this point in the history
Change map keys from pointers to values.
Add logging.
Wrap long lines and update comments.
  • Loading branch information
bdarnell committed Mar 14, 2016
1 parent 8e44cdc commit 6be1079
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 25 deletions.
3 changes: 2 additions & 1 deletion storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica,
}
}

if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, true /* wait */, false /* !poison */); pErr != nil {
if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents,
true /* wait */, false /* !poison */); pErr != nil {
return pErr.GoError()
}

Expand Down
62 changes: 43 additions & 19 deletions storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ const intentResolverTaskLimit = 100
type intentResolver struct {
store *Store

sem chan struct{} // Semaphore to limit async goroutines
sem chan struct{} // Semaphore to limit async goroutines.

mu struct {
sync.Mutex
// Maps transaction ids to a refcount.
inFlight map[*uuid.UUID]int
inFlight map[uuid.UUID]int
}
}

Expand All @@ -58,7 +58,7 @@ func newIntentResolver(store *Store) *intentResolver {
store: store,
sem: make(chan struct{}, intentResolverTaskLimit),
}
ir.mu.inFlight = map[*uuid.UUID]int{}
ir.mu.inFlight = map[uuid.UUID]int{}
return ir
}

Expand All @@ -69,10 +69,13 @@ func newIntentResolver(store *Store) *intentResolver {
//
// The returned error may be a copy of the original WriteIntentError,
// with or without the Resolved flag set, which governs the client's
// retry behavior. (if the transaction is pushed, the Resolved flag is
// retry behavior (if the transaction is pushed, the Resolved flag is
// set to tell the client to retry immediately; otherwise it is false
// to cause the client to back off).
func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) *roachpb.Error {
func (ir *intentResolver) processWriteIntentError(ctx context.Context,
wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header,
pushType roachpb.PushTxnType) *roachpb.Error {

if log.V(6) {
log.Infoc(ctx, "resolving write intent %s", wiErr)
}
Expand All @@ -81,7 +84,8 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa
readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param

resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType, false)
if resErr := ir.resolveIntents(ctx, r, resolveIntents, false /* !wait */, true /* poison */); resErr != nil {
if resErr := ir.resolveIntents(ctx, r, resolveIntents,
false /* !wait */, true /* poison */); resErr != nil {
// When resolving without waiting, errors should not
// usually be returned here, although there are some cases
// when they may be (especially when a test cluster is in
Expand Down Expand Up @@ -127,14 +131,25 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa
// maybePushTransaction, but if the error is non-nil then some of the
// conflicting transactions may still be pending.
//
// If skipInFlight is true, then no PushTxns will be sent and no
// intents will be returned for any transaction for which there is
// another push in progress. This should only be used by callers who
// are not relying on the side effect of a push (i.e. only
// pushType==PUSH_TOUCH), and who also don't need to synchronize with
// the resolution of those intents (e.g. asynchronous resolutions of
// intents skipped on inconsistent reads).
//
// Callers are involved with
// a) conflict resolution for commands being executed at the Store with the
// client waiting,
// b) resolving intents encountered during inconsistent operations, and
// c) resolving intents upon EndTransaction which are not local to the given
// range. This is the only path in which the transaction is going to be
// in non-pending state and doesn't require a push.
func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) ([]roachpb.Intent, *roachpb.Error) {
func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent,
h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) (
[]roachpb.Intent, *roachpb.Error) {

now := ir.store.Clock().Now()

pusherTxn := h.Txn
Expand All @@ -161,13 +176,16 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r
// because the transaction is already finalized.
// TODO(bdarnell): can this happen any more?
resolveIntents = append(resolveIntents, intent)
} else if _, ok := ir.mu.inFlight[intent.Txn.ID]; ok && skipInFlight {
} else if _, ok := ir.mu.inFlight[*intent.Txn.ID]; ok && skipInFlight {
// Another goroutine is working on this transaction so we can
// skip it.
if log.V(1) {
log.Infof("skipping PushTxn for %s; attempt already in flight", intent.Txn.ID)
}
continue
} else {
pushIntents = append(pushIntents, intent)
ir.mu.inFlight[intent.Txn.ID]++
ir.mu.inFlight[*intent.Txn.ID]++
}
}
ir.mu.Unlock()
Expand Down Expand Up @@ -198,9 +216,9 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r
br, err := ir.store.db.RunWithResponse(b)
ir.mu.Lock()
for _, intent := range pushIntents {
ir.mu.inFlight[intent.Txn.ID]--
if ir.mu.inFlight[intent.Txn.ID] == 0 {
delete(ir.mu.inFlight, intent.Txn.ID)
ir.mu.inFlight[*intent.Txn.ID]--
if ir.mu.inFlight[*intent.Txn.ID] == 0 {
delete(ir.mu.inFlight, *intent.Txn.ID)
}
}
ir.mu.Unlock()
Expand All @@ -221,7 +239,7 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r
// processIntentsAsync asynchronously processes intents which were
// encountered during another command but did not interfere with the
// execution of that command. This occurs in two cases: inconsistent
// reads and EndTransaction (which queues its own intents for
// reads and EndTransaction (which queues its own external intents for
// processing via this method). The two cases are handled somewhat
// differently and would be better served by different entry points,
// but combining them simplifies the plumbing necessary in Replica.
Expand All @@ -245,7 +263,8 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA
h := roachpb.Header{Timestamp: now}
resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout,
item.intents, h, roachpb.PUSH_TOUCH, true /* skipInFlight */)
if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil {
if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents,
true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil {
log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr)
return
}
Expand All @@ -261,7 +280,8 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA

// For EndTransaction, we know the transaction is finalized so
// we can skip the push and go straight to the resolve.
if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil {
if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents,
true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil {
log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr)
return
}
Expand Down Expand Up @@ -302,8 +322,9 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA
EndKey: r.Desc().EndKey.AsRawKey(),
},
}
gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{Key: keys.TransactionKey(txn.Key, txn.ID)})

gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{
Key: keys.TransactionKey(txn.Key, txn.ID),
})
ba.Add(&gcArgs)
if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil {
log.Warningf("could not GC completed transaction: %s", pErr)
Expand All @@ -322,11 +343,14 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA
// executed). This ensures that if a waiting client retries
// immediately after calling this function, it will not hit the same
// intents again.
func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error {
func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica,
intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error {

sp, cleanupSp := tracing.SpanFromContext(opReplica, ir.store.Tracer(), ctx)
defer cleanupSp()

ctx = opentracing.ContextWithSpan(ctx, nil) // we're doing async stuff below; those need new traces
// We're doing async stuff below; those need new traces.
ctx = opentracing.ContextWithSpan(ctx, nil)
sp.LogEvent(fmt.Sprintf("resolving intents [wait=%t]", wait))

var reqsRemote []roachpb.Request
Expand Down
11 changes: 6 additions & 5 deletions storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,11 +2286,12 @@ func TestReplicaResolveIntentNoWait(t *testing.T) {
setupResolutionTest(t, tc, roachpb.Key("a") /* irrelevant */, splitKey)
txn := newTransaction("name", key, 1, roachpb.SERIALIZABLE, tc.clock)
txn.Status = roachpb.COMMITTED
if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng, []roachpb.Intent{{
Span: roachpb.Span{Key: key},
Txn: txn.TxnMeta,
Status: txn.Status,
}}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil {
if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng,
[]roachpb.Intent{{
Span: roachpb.Span{Key: key},
Txn: txn.TxnMeta,
Status: txn.Status,
}}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil {
t.Fatal(pErr)
}
util.SucceedsSoon(t, func() error {
Expand Down
11 changes: 11 additions & 0 deletions util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,22 @@ func (s *Stopper) RunAsyncTask(f func()) bool {
func (s *Stopper) RunLimitedAsyncTask(sem chan struct{}, f func()) bool {
file, line, _ := caller.Lookup(1)
key := taskKey{file, line}

// Wait for permission to run from the semaphore.
select {
case sem <- struct{}{}:
case <-s.ShouldDrain():
return false
default:
log.Printf("stopper throttling task from %s:%d due to semaphore", file, line)
// Retry the select without the default.
select {
case sem <- struct{}{}:
case <-s.ShouldDrain():
return false
}
}

if !s.runPrelude(key) {
<-sem
return false
Expand Down

0 comments on commit 6be1079

Please sign in to comment.