Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Improve handling of lease index retries #35261

Merged
merged 10 commits into from
Mar 19, 2019
111 changes: 111 additions & 0 deletions pkg/storage/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package batcheval

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// TestLeaseTransferWithPipelinedWrite verifies that pipelined writes
// do not cause retry errors to be leaked to clients when the error
// can be handled internally. Pipelining dissociates a write from its
// caller, so the retries of internally-generated errors (specifically
// out-of-order lease indexes) must be retried below that level.
//
// This issue was observed in practice to affect the first insert
// after table creation with high probability.
func TestLeaseTransferWithPipelinedWrite(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

tc := serverutils.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0)

// More than 30 iterations is flaky under stressrace on teamcity.
for iter := 0; iter < 30; iter++ {
log.Infof(ctx, "iter %d", iter)
if _, err := db.ExecContext(ctx, "drop table if exists test"); err != nil {
t.Fatal(err)
}
if _, err := db.ExecContext(ctx, "create table test (a int, b int, primary key (a, b))"); err != nil {
t.Fatal(err)
}

workerErrCh := make(chan error, 1)
go func() {
workerErrCh <- func() error {
for i := 0; i < 1; i++ {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
if tx != nil {
if err := tx.Rollback(); err != nil {
log.Warningf(ctx, "error rolling back: %s", err)
}
}
}()
// Run two inserts in a transaction to ensure that we have
// pipelined writes that cannot be retried at the SQL layer
// due to the first-statement rule.
if _, err := tx.ExecContext(ctx, "INSERT INTO test (a, b) VALUES ($1, $2)", i, 1); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, "INSERT INTO test (a, b) VALUES ($1, $2)", i, 2); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
tx = nil
}
return nil
}()
}()

// TODO(bdarnell): This test reliably reproduced the issue when
// introduced, because table creation causes splits and repeated
// table creation leads to lease transfers due to rebalancing.
// This is a subtle thing to rely on and the test might become
// more reliable if we ran more iterations in the worker goroutine
// and added a second goroutine to explicitly transfer leases back
// and forth.

select {
case <-time.After(15 * time.Second):
// TODO(bdarnell): The test seems flaky under stress with a 5s
// timeout. Why? I'm giving it a high timeout since hanging
// isn't a failure mode we're particularly concerned about here,
// but it shouldn't be taking this long even with stress.
t.Fatal("timed out")
case err := <-workerErrCh:
if err != nil {
t.Fatalf("worker failed: %s", err)
}
}
}
}
6 changes: 2 additions & 4 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,13 +985,11 @@ type endCmds struct {

// done releases the latches acquired by the command and updates
// the timestamp cache using the final timestamp of each command.
func (ec *endCmds) done(
br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalReevaluationReason,
) {
func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
// processed. Inconsistent reads are excluded.
if retry == proposalNoReevaluation && ec.ba.ReadConsistency == roachpb.CONSISTENT {
if ec.ba.ReadConsistency == roachpb.CONSISTENT {
ec.repl.updateTimestampCache(&ec.ba, br, pErr)
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type ProposalData struct {
// counted on to invoke endCmds itself.)
func (proposal *ProposalData) finishApplication(pr proposalResult) {
if proposal.endCmds != nil {
proposal.endCmds.done(pr.Reply, pr.Err, pr.ProposalRetry)
proposal.endCmds.done(pr.Reply, pr.Err)
proposal.endCmds = nil
}
if proposal.sp != nil {
Expand Down Expand Up @@ -808,14 +808,12 @@ func (r *Replica) handleEvalResultRaftMuLocked(
}

// proposalResult indicates the result of a proposal. Exactly one of
// Reply, Err and ProposalRetry is set, and it represents the result of
// the proposal.
// Reply and Err is set, and it represents the result of the proposal.
type proposalResult struct {
Reply *roachpb.BatchResponse
Err *roachpb.Error
ProposalRetry proposalReevaluationReason
Intents []result.IntentsWithArg
EndTxns []result.EndTxnIntents
Reply *roachpb.BatchResponse
Err *roachpb.Error
Intents []result.IntentsWithArg
EndTxns []result.EndTxnIntents
}

// evaluateProposal generates a Result from the given request by
Expand Down
Loading