diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 84b0ed9c8d65..c37ac5b34408 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -655,7 +655,7 @@ const executingParallelAndSerialChecks = "executing %d checks concurrently and % // - evalCtx is the evaluation context in which the plan will run. It might be // mutated. // - finishedSetupFn, if non-nil, is called synchronously after all the -// processors have successfully started up. +// processors have been created but haven't started running yet. func (dsp *DistSQLPlanner) Run( ctx context.Context, planCtx *PlanningCtx, @@ -940,6 +940,10 @@ type DistSQLReceiver struct { // See EXECUTE .. DISCARD ROWS. discardRows bool + // dataPushed is set once at least one row, one batch, or one non-error + // piece of metadata is pushed to the result writer. + dataPushed bool + // commErr keeps track of the error received from interacting with the // resultWriter. This represents a "communication error" and as such is unlike // query execution errors: when the DistSQLReceiver is used within a SQL @@ -1223,6 +1227,20 @@ func MakeDistSQLReceiver( return r } +// resetForLocalRerun prepares the DistSQLReceiver to be used again for +// executing the plan - that encountered an error when run in the distributed +// fashion - locally. +func (r *DistSQLReceiver) resetForLocalRerun(stats topLevelQueryStats) { + r.resultWriterMu.row.SetError(nil) + r.updateStatus.Store(false) + r.status = execinfra.NeedMoreRows + r.dataPushed = false + r.closed = false + r.stats = stats + r.egressCounter = nil + atomic.StoreUint64(r.progressAtomic, math.Float64bits(0)) +} + // Release releases this DistSQLReceiver back to the pool. func (r *DistSQLReceiver) Release() { r.cleanup() @@ -1332,6 +1350,9 @@ func (r *DistSQLReceiver) checkConcurrentError() { func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus { if metaWriter, ok := r.resultWriterMu.row.(MetadataResultWriter); ok { metaWriter.AddMeta(r.ctx, meta) + if meta.Err == nil { + r.dataPushed = true + } } if meta.LeafTxnFinalState != nil { if r.txn != nil { @@ -1486,6 +1507,7 @@ func (r *DistSQLReceiver) Push( if commErr := r.resultWriterMu.row.AddRow(r.ctx, r.row); commErr != nil { r.handleCommErr(commErr) } + r.dataPushed = true return r.status } @@ -1541,6 +1563,7 @@ func (r *DistSQLReceiver) PushBatch( if commErr := r.resultWriterMu.batch.AddBatch(r.ctx, batch); commErr != nil { r.handleCommErr(commErr) } + r.dataPushed = true return r.status } @@ -1583,7 +1606,8 @@ func (r *DistSQLReceiver) ProducerDone() { // function updates the passed-in planner to make sure that the "inner" plans // use the LeafTxns if the "outer" plan happens to have concurrency. It also // returns a non-nil cleanup function that must be called once all plans (the -// "outer" as well as all "inner" ones) are done. +// "outer" as well as all "inner" ones) are done. The returned functions can be +// called multiple times. func getFinishedSetupFn(planner *planner) (finishedSetupFn func(flowinfra.Flow), cleanup func()) { finishedSetupFn = func(localFlow flowinfra.Flow) { if localFlow.GetFlowCtx().Txn.Type() == kv.LeafTxn { @@ -1883,14 +1907,30 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( return nil } +var distributedQueryRerunAsLocalEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.distsql.distributed_query_rerun_locally.enabled", + "determines whether the distributed plans can be rerun locally for some errors", + true, +) + // PlanAndRun generates a physical plan from a planNode tree and executes it. It -// assumes that the tree is supported (see CheckSupport). +// assumes that the tree is supported (see checkSupportForPlanNode). // // All errors encountered are reported to the DistSQLReceiver's resultWriter. // Additionally, if the error is a "communication error" (an error encountered // while using that resultWriter), the error is also stored in // DistSQLReceiver.commErr. That can be tested to see if a client session needs // to be closed. +// +// An allow-list of errors that are encountered during the distributed query +// execution are transparently retried by re-planning and re-running the query +// as local (as long as no data has been communicated to the result writer). +// +// - finishedSetupFn, if non-nil, is called synchronously after all the local +// processors have been created but haven't started running yet. If the query is +// re-planned as local after having encountered an error during distributed +// execution, then finishedSetupFn will be called twice. func (dsp *DistSQLPlanner) PlanAndRun( ctx context.Context, evalCtx *extendedEvalContext, @@ -1901,7 +1941,9 @@ func (dsp *DistSQLPlanner) PlanAndRun( finishedSetupFn func(localFlow flowinfra.Flow), ) { log.VEventf(ctx, 2, "creating DistSQL plan with isLocal=%v", planCtx.isLocal) - + // Copy query-level stats before executing this plan in case we need to + // re-run it as local. + subqueriesStats := recv.stats physPlan, physPlanCleanup, err := dsp.createPhysPlan(ctx, planCtx, plan) defer physPlanCleanup() if err != nil { @@ -1911,6 +1953,61 @@ func (dsp *DistSQLPlanner) PlanAndRun( finalizePlanWithRowCount(ctx, planCtx, physPlan, planCtx.planner.curPlan.mainRowCount) recv.expectedRowsRead = int64(physPlan.TotalEstimatedScannedRows) dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtx, finishedSetupFn) + if distributedErr := recv.getError(); distributedErr != nil && !planCtx.isLocal && + distributedQueryRerunAsLocalEnabled.Get(&dsp.st.SV) { + // If we had a distributed plan which resulted in an error, we want to + // retry this query as local in some cases. In particular, this retry + // mechanism allows us to hide transient network problems, and - more + // importantly - in the multi-tenant model it allows us to go around the + // problem when "not ready" SQL instance is being used for DistSQL + // planning (e.g. the instance might have been brought down, but the + // cache on top of the system table hasn't been updated accordingly). + // + // The rationale for why it is ok to do so is that we'll create + // brand-new processors that aren't affiliated to the distributed plan + // that was just cleaned up. It's worth mentioning that the planNode + // tree couldn't have been reused in this way, but if we needed to + // execute any planNodes directly, then we would have to run such a plan + // in a local fashion. In other words, the fact that we had a + // distributed plan initially guarantees that we don't have any + // planNodes to be concerned about. + // TODO(yuzefovich): consider introducing this retry mechanism to sub- + // and post-queries too. + if recv.dataPushed || plan.isPhysicalPlan() { + // If some data has already been pushed to the result writer, we + // cannot retry. Also, we cannot re-plan locally if we used the + // experimental DistSQL spec planning factory. + return + } + if recv.commErr != nil || ctx.Err() != nil { + // For communication errors, we don't try to rerun the query since + // the connection is toast. We also give up if the context + // cancellation has already occurred. + return + } + if !pgerror.IsSQLRetryableError(distributedErr) && !flowinfra.IsFlowRetryableError(distributedErr) { + // Only re-run the query if we think there is a high chance of a + // successful local execution. + return + } + log.VEventf(ctx, 1, "encountered an error when running the distributed plan, re-running it as local: %v", distributedErr) + recv.resetForLocalRerun(subqueriesStats) + // Note that since we're going to execute the query locally now, there + // is no point in providing the locality filter since it will be ignored + // anyway, so we don't use NewPlanningCtxWithOracle constructor. + localPlanCtx := dsp.NewPlanningCtx( + ctx, evalCtx, planCtx.planner, evalCtx.Txn, DistributionTypeNone, + ) + localPhysPlan, localPhysPlanCleanup, err := dsp.createPhysPlan(ctx, localPlanCtx, plan) + defer localPhysPlanCleanup() + if err != nil { + recv.SetError(err) + return + } + finalizePlanWithRowCount(ctx, localPlanCtx, localPhysPlan, localPlanCtx.planner.curPlan.mainRowCount) + recv.expectedRowsRead = int64(localPhysPlan.TotalEstimatedScannedRows) + dsp.Run(ctx, localPlanCtx, txn, localPhysPlan, recv, evalCtx, finishedSetupFn) + } } // PlanAndRunCascadesAndChecks runs any cascade and check queries. diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 32f87389b5c0..74bf5b8bc53f 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -972,3 +973,125 @@ CREATE TABLE child ( sqlDB.Exec(t, fmt.Sprintf(`%[1]sINSERT INTO child VALUES (%[2]d, %[2]d, %[2]d)`, prefix, id)) } } + +// TestDistributedQueryErrorIsRetriedLocally verifies that if a query with a +// distributed plan results in a SQL retryable error, then it is rerun as local +// transparently. +func TestDistributedQueryErrorIsRetriedLocally(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Start a 3 node cluster where we can inject an error for SetupFlow RPC on + // the server side for the queries in question. + const numNodes = 3 + getError := func(nodeID base.SQLInstanceID) error { + return errors.Newf("connection refused: n%d", nodeID) + } + // Assert that the injected error is in the allow-list of errors that are + // retried transparently. + if err := getError(base.SQLInstanceID(0)); !pgerror.IsSQLRetryableError(err) { + t.Fatalf("expected error to be in the allow-list for a retry: %v", err) + } + + // We use different queries to simplify handling the node ID on which the + // error should be injected (i.e. we avoid the need for synchronization in + // the test). In particular, the difficulty comes from the fact that some of + // the SetupFlow RPCs might not be issued at all while others are served + // after the corresponding flow on the gateway has exited. + queries := []string{ + "SELECT k FROM test.foo", + "SELECT v FROM test.foo", + "SELECT * FROM test.foo", + } + stmtToNodeIDForError := map[string]base.SQLInstanceID{ + queries[0]: 2, // error on n2 + queries[1]: 3, // error on n3 + queries[2]: 0, // no error + } + tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + SetupFlowCb: func(_ context.Context, nodeID base.SQLInstanceID, req *execinfrapb.SetupFlowRequest) error { + nodeIDForError, ok := stmtToNodeIDForError[req.StatementSQL] + if !ok || nodeIDForError != nodeID { + return nil + } + return getError(nodeID) + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + + // Create a table with 30 rows, split them into 3 ranges with each node + // having one. + db := tc.ServerConn(0) + sqlDB := sqlutils.MakeSQLRunner(db) + sqlutils.CreateTable( + t, db, "foo", + "k INT PRIMARY KEY, v INT", + 30, + sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(2)), + ) + sqlDB.Exec(t, "ALTER TABLE test.foo SPLIT AT VALUES (10), (20)") + sqlDB.Exec( + t, + fmt.Sprintf("ALTER TABLE test.foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], 0), (ARRAY[%d], 10), (ARRAY[%d], 20)", + tc.Server(0).GetFirstStoreID(), + tc.Server(1).GetFirstStoreID(), + tc.Server(2).GetFirstStoreID(), + ), + ) + + for _, query := range queries { + nodeID := stmtToNodeIDForError[query] + injectError := nodeID != base.SQLInstanceID(0) + if injectError { + t.Logf("running %q with error being injected on n%d", query, nodeID) + } else { + t.Logf("running %q without error being injected", query) + } + sqlDB.Exec(t, "SET TRACING=on;") + _, err := db.Exec(query) + // We expect that the query was retried as local which should succeed. + require.NoError(t, err) + sqlDB.Exec(t, "SET TRACING=off;") + trace := sqlDB.QueryStr(t, "SELECT message FROM [SHOW TRACE FOR SESSION]") + // Inspect the trace to ensure that the query was, indeed, initially run + // as distributed but hit a retryable error and was rerun as local. + var foundDistributed, foundLocal bool + for _, message := range trace { + if strings.Contains(message[0], "creating DistSQL plan with isLocal=false") { + foundDistributed = true + } else if strings.Contains(message[0], "encountered an error when running the distributed plan, re-running it as local") { + foundLocal = true + } + } + if injectError { + if !foundDistributed || !foundLocal { + t.Fatalf("with remote error injection, foundDistributed=%t, foundLocal=%t\ntrace:%s", foundDistributed, foundLocal, trace) + } + } else { + // When no error is injected, the query should succeed right away + // when run in distributed fashion. + if !foundDistributed || foundLocal { + t.Fatalf("without remote error injection, foundDistributed=%t, foundLocal=%t\ntrace:%s", foundDistributed, foundLocal, trace) + } + } + } + + // Now disable the retry mechanism and ensure that when remote error is + // injected, it is returned as the query result. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.distributed_query_rerun_locally.enabled = false;") + for _, query := range queries[:2] { + nodeID := stmtToNodeIDForError[query] + t.Logf("running %q with error being injected on n%d but local retry disabled", query, nodeID) + _, err := db.Exec(query) + require.NotNil(t, err) + // lib/pq wraps the error, so we cannot use errors.Is() check. + require.True(t, strings.Contains(err.Error(), getError(nodeID).Error())) + } +}