From be9a6123dd7224e4ca87ece1f9966a167968d589 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 22 Jun 2023 17:24:47 -0700 Subject: [PATCH 1/2] sem/builtins: fix json_populate_record in an edge case This commit fixes `json_populate_record` builtin in an edge case. In particular, this generator builtin calls `eval.PopulateRecordWithJSON` which modifies the passed-in tuple in-place, and right now the builtin passes the input tuple. This leads to modification of the Datum which is not allowed. However, this is mostly philosophical bug that doesn't lead to any actual issues since from a single input tuple the builtin only generates a single output tuple. I noticed this problem when tried to re-execute the distributed query as local, but the tuple was corrupted for that second local execution. Release note: None --- pkg/sql/sem/builtins/generator_builtins.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/sql/sem/builtins/generator_builtins.go b/pkg/sql/sem/builtins/generator_builtins.go index 86c353d6e8d0..b8f733bbdbff 100644 --- a/pkg/sql/sem/builtins/generator_builtins.go +++ b/pkg/sql/sem/builtins/generator_builtins.go @@ -1760,10 +1760,12 @@ func (j *jsonPopulateRecordGenerator) Next(ctx context.Context) (bool, error) { // Values is part of the tree.ValueGenerator interface. func (j jsonPopulateRecordGenerator) Values() (tree.Datums, error) { - if err := eval.PopulateRecordWithJSON(j.ctx, j.evalCtx, j.target, j.input.ResolvedType(), j.input); err != nil { + output := tree.NewDTupleWithLen(j.input.ResolvedType(), j.input.D.Len()) + copy(output.D, j.input.D) + if err := eval.PopulateRecordWithJSON(j.ctx, j.evalCtx, j.target, j.input.ResolvedType(), output); err != nil { return nil, err } - return j.input.D, nil + return output.D, nil } func makeJSONPopulateRecordSetGenerator( From a85dbd982a29b783ac5b66ee5ada45bfb3b9541c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 22 Jun 2023 16:30:29 -0700 Subject: [PATCH 2/2] sql: re-execute distributed query as local for some errors This commit teaches the main query code path (i.e. ignoring sub- and post-queries) to retry distributed plans as local in some cases. In particular, we use this retry mechanism if: - the error is SQL retryable (i.e. it'll have a high chance of success during the local execution) - no data has been pushed to the result writer by the distributed query (this shouldn't be a frequent scenario since most SQL retryable errors are likely to occur during the plan setup / before any data can be produced by the query). This retry mechanism allows us to hide transient network problems, and - more importantly - in the multi-tenant model it allows us to go around the problem when "not ready" SQL instance is being used for DistSQL planning (e.g. the instance might have been brought down, but the cache on top of the system table hasn't been updated accordingly). I believe that no matter the improvements that we can make to the instance cache, there will also be a window (which should hopefully getting smaller - according to David T it's currently 45s but he hopes to bring it down to 7s or so) during which the instance cache is stale, so DistSQL planner could use "not ready" instances. The rationale for why it is ok to do this retry is that we create brand-new processors that aren't affiliated to the distributed plan that was just cleaned up. It's worth mentioning that the planNode tree couldn't have been reused in this way, but if we needed to execute any planNodes directly, then we would have to run such a plan in a local fashion. In other words, the fact that we had a distributed plan initially guarantees that we don't have any planNodes to be concerned about. Possible downside to this approach is that it increases overall query latency, so ideally we wouldn't plan on "not ready" instances in the first place (and we have issues to improve there), but given that we now fully parallelize the setup of distributed plans, the latency increase should be bound, assuming that most retryable errors occur during the distributed plan setup. Release note: None --- pkg/sql/distsql_running.go | 111 ++++++++++++++++++++++++++-- pkg/sql/distsql_running_test.go | 123 ++++++++++++++++++++++++++++++++ pkg/sql/sqltelemetry/exec.go | 8 +++ 3 files changed, 238 insertions(+), 4 deletions(-) diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 84b0ed9c8d65..d8e3bb98fb11 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -655,7 +655,7 @@ const executingParallelAndSerialChecks = "executing %d checks concurrently and % // - evalCtx is the evaluation context in which the plan will run. It might be // mutated. // - finishedSetupFn, if non-nil, is called synchronously after all the -// processors have successfully started up. +// processors have been created but haven't started running yet. func (dsp *DistSQLPlanner) Run( ctx context.Context, planCtx *PlanningCtx, @@ -940,6 +940,10 @@ type DistSQLReceiver struct { // See EXECUTE .. DISCARD ROWS. discardRows bool + // dataPushed is set once at least one row, one batch, or one non-error + // piece of metadata is pushed to the result writer. + dataPushed bool + // commErr keeps track of the error received from interacting with the // resultWriter. This represents a "communication error" and as such is unlike // query execution errors: when the DistSQLReceiver is used within a SQL @@ -1223,6 +1227,20 @@ func MakeDistSQLReceiver( return r } +// resetForLocalRerun prepares the DistSQLReceiver to be used again for +// executing the plan - that encountered an error when run in the distributed +// fashion - locally. +func (r *DistSQLReceiver) resetForLocalRerun(stats topLevelQueryStats) { + r.resultWriterMu.row.SetError(nil) + r.updateStatus.Store(false) + r.status = execinfra.NeedMoreRows + r.dataPushed = false + r.closed = false + r.stats = stats + r.egressCounter = nil + atomic.StoreUint64(r.progressAtomic, math.Float64bits(0)) +} + // Release releases this DistSQLReceiver back to the pool. func (r *DistSQLReceiver) Release() { r.cleanup() @@ -1332,6 +1350,9 @@ func (r *DistSQLReceiver) checkConcurrentError() { func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus { if metaWriter, ok := r.resultWriterMu.row.(MetadataResultWriter); ok { metaWriter.AddMeta(r.ctx, meta) + if meta.Err == nil { + r.dataPushed = true + } } if meta.LeafTxnFinalState != nil { if r.txn != nil { @@ -1486,6 +1507,7 @@ func (r *DistSQLReceiver) Push( if commErr := r.resultWriterMu.row.AddRow(r.ctx, r.row); commErr != nil { r.handleCommErr(commErr) } + r.dataPushed = true return r.status } @@ -1541,6 +1563,7 @@ func (r *DistSQLReceiver) PushBatch( if commErr := r.resultWriterMu.batch.AddBatch(r.ctx, batch); commErr != nil { r.handleCommErr(commErr) } + r.dataPushed = true return r.status } @@ -1583,7 +1606,8 @@ func (r *DistSQLReceiver) ProducerDone() { // function updates the passed-in planner to make sure that the "inner" plans // use the LeafTxns if the "outer" plan happens to have concurrency. It also // returns a non-nil cleanup function that must be called once all plans (the -// "outer" as well as all "inner" ones) are done. +// "outer" as well as all "inner" ones) are done. The returned functions can be +// called multiple times. func getFinishedSetupFn(planner *planner) (finishedSetupFn func(flowinfra.Flow), cleanup func()) { finishedSetupFn = func(localFlow flowinfra.Flow) { if localFlow.GetFlowCtx().Txn.Type() == kv.LeafTxn { @@ -1883,14 +1907,30 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( return nil } +var distributedQueryRerunAsLocalEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.distsql.distributed_query_rerun_locally.enabled", + "determines whether the distributed plans can be rerun locally for some errors", + true, +) + // PlanAndRun generates a physical plan from a planNode tree and executes it. It -// assumes that the tree is supported (see CheckSupport). +// assumes that the tree is supported (see checkSupportForPlanNode). // // All errors encountered are reported to the DistSQLReceiver's resultWriter. // Additionally, if the error is a "communication error" (an error encountered // while using that resultWriter), the error is also stored in // DistSQLReceiver.commErr. That can be tested to see if a client session needs // to be closed. +// +// An allow-list of errors that are encountered during the distributed query +// execution are transparently retried by re-planning and re-running the query +// as local (as long as no data has been communicated to the result writer). +// +// - finishedSetupFn, if non-nil, is called synchronously after all the local +// processors have been created but haven't started running yet. If the query is +// re-planned as local after having encountered an error during distributed +// execution, then finishedSetupFn will be called twice. func (dsp *DistSQLPlanner) PlanAndRun( ctx context.Context, evalCtx *extendedEvalContext, @@ -1901,7 +1941,9 @@ func (dsp *DistSQLPlanner) PlanAndRun( finishedSetupFn func(localFlow flowinfra.Flow), ) { log.VEventf(ctx, 2, "creating DistSQL plan with isLocal=%v", planCtx.isLocal) - + // Copy query-level stats before executing this plan in case we need to + // re-run it as local. + subqueriesStats := recv.stats physPlan, physPlanCleanup, err := dsp.createPhysPlan(ctx, planCtx, plan) defer physPlanCleanup() if err != nil { @@ -1911,6 +1953,67 @@ func (dsp *DistSQLPlanner) PlanAndRun( finalizePlanWithRowCount(ctx, planCtx, physPlan, planCtx.planner.curPlan.mainRowCount) recv.expectedRowsRead = int64(physPlan.TotalEstimatedScannedRows) dsp.Run(ctx, planCtx, txn, physPlan, recv, evalCtx, finishedSetupFn) + if distributedErr := recv.getError(); distributedErr != nil && !planCtx.isLocal && + distributedQueryRerunAsLocalEnabled.Get(&dsp.st.SV) { + // If we had a distributed plan which resulted in an error, we want to + // retry this query as local in some cases. In particular, this retry + // mechanism allows us to hide transient network problems, and - more + // importantly - in the multi-tenant model it allows us to go around the + // problem when "not ready" SQL instance is being used for DistSQL + // planning (e.g. the instance might have been brought down, but the + // cache on top of the system table hasn't been updated accordingly). + // + // The rationale for why it is ok to do so is that we'll create + // brand-new processors that aren't affiliated to the distributed plan + // that was just cleaned up. It's worth mentioning that the planNode + // tree couldn't have been reused in this way, but if we needed to + // execute any planNodes directly, then we would have to run such a plan + // in a local fashion. In other words, the fact that we had a + // distributed plan initially guarantees that we don't have any + // planNodes to be concerned about. + // TODO(yuzefovich): consider introducing this retry mechanism to sub- + // and post-queries too. + if recv.dataPushed || plan.isPhysicalPlan() { + // If some data has already been pushed to the result writer, we + // cannot retry. Also, we cannot re-plan locally if we used the + // experimental DistSQL spec planning factory. + return + } + if recv.commErr != nil || ctx.Err() != nil { + // For communication errors, we don't try to rerun the query since + // the connection is toast. We also give up if the context + // cancellation has already occurred. + return + } + if !pgerror.IsSQLRetryableError(distributedErr) && !flowinfra.IsFlowRetryableError(distributedErr) { + // Only re-run the query if we think there is a high chance of a + // successful local execution. + return + } + log.VEventf(ctx, 1, "encountered an error when running the distributed plan, re-running it as local: %v", distributedErr) + recv.resetForLocalRerun(subqueriesStats) + telemetry.Inc(sqltelemetry.DistributedErrorLocalRetryAttempt) + defer func() { + if recv.getError() == nil { + telemetry.Inc(sqltelemetry.DistributedErrorLocalRetrySuccess) + } + }() + // Note that since we're going to execute the query locally now, there + // is no point in providing the locality filter since it will be ignored + // anyway, so we don't use NewPlanningCtxWithOracle constructor. + localPlanCtx := dsp.NewPlanningCtx( + ctx, evalCtx, planCtx.planner, evalCtx.Txn, DistributionTypeNone, + ) + localPhysPlan, localPhysPlanCleanup, err := dsp.createPhysPlan(ctx, localPlanCtx, plan) + defer localPhysPlanCleanup() + if err != nil { + recv.SetError(err) + return + } + finalizePlanWithRowCount(ctx, localPlanCtx, localPhysPlan, localPlanCtx.planner.curPlan.mainRowCount) + recv.expectedRowsRead = int64(localPhysPlan.TotalEstimatedScannedRows) + dsp.Run(ctx, localPlanCtx, txn, localPhysPlan, recv, evalCtx, finishedSetupFn) + } } // PlanAndRunCascadesAndChecks runs any cascade and check queries. diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 32f87389b5c0..74bf5b8bc53f 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -972,3 +973,125 @@ CREATE TABLE child ( sqlDB.Exec(t, fmt.Sprintf(`%[1]sINSERT INTO child VALUES (%[2]d, %[2]d, %[2]d)`, prefix, id)) } } + +// TestDistributedQueryErrorIsRetriedLocally verifies that if a query with a +// distributed plan results in a SQL retryable error, then it is rerun as local +// transparently. +func TestDistributedQueryErrorIsRetriedLocally(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Start a 3 node cluster where we can inject an error for SetupFlow RPC on + // the server side for the queries in question. + const numNodes = 3 + getError := func(nodeID base.SQLInstanceID) error { + return errors.Newf("connection refused: n%d", nodeID) + } + // Assert that the injected error is in the allow-list of errors that are + // retried transparently. + if err := getError(base.SQLInstanceID(0)); !pgerror.IsSQLRetryableError(err) { + t.Fatalf("expected error to be in the allow-list for a retry: %v", err) + } + + // We use different queries to simplify handling the node ID on which the + // error should be injected (i.e. we avoid the need for synchronization in + // the test). In particular, the difficulty comes from the fact that some of + // the SetupFlow RPCs might not be issued at all while others are served + // after the corresponding flow on the gateway has exited. + queries := []string{ + "SELECT k FROM test.foo", + "SELECT v FROM test.foo", + "SELECT * FROM test.foo", + } + stmtToNodeIDForError := map[string]base.SQLInstanceID{ + queries[0]: 2, // error on n2 + queries[1]: 3, // error on n3 + queries[2]: 0, // no error + } + tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + SetupFlowCb: func(_ context.Context, nodeID base.SQLInstanceID, req *execinfrapb.SetupFlowRequest) error { + nodeIDForError, ok := stmtToNodeIDForError[req.StatementSQL] + if !ok || nodeIDForError != nodeID { + return nil + } + return getError(nodeID) + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + + // Create a table with 30 rows, split them into 3 ranges with each node + // having one. + db := tc.ServerConn(0) + sqlDB := sqlutils.MakeSQLRunner(db) + sqlutils.CreateTable( + t, db, "foo", + "k INT PRIMARY KEY, v INT", + 30, + sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(2)), + ) + sqlDB.Exec(t, "ALTER TABLE test.foo SPLIT AT VALUES (10), (20)") + sqlDB.Exec( + t, + fmt.Sprintf("ALTER TABLE test.foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], 0), (ARRAY[%d], 10), (ARRAY[%d], 20)", + tc.Server(0).GetFirstStoreID(), + tc.Server(1).GetFirstStoreID(), + tc.Server(2).GetFirstStoreID(), + ), + ) + + for _, query := range queries { + nodeID := stmtToNodeIDForError[query] + injectError := nodeID != base.SQLInstanceID(0) + if injectError { + t.Logf("running %q with error being injected on n%d", query, nodeID) + } else { + t.Logf("running %q without error being injected", query) + } + sqlDB.Exec(t, "SET TRACING=on;") + _, err := db.Exec(query) + // We expect that the query was retried as local which should succeed. + require.NoError(t, err) + sqlDB.Exec(t, "SET TRACING=off;") + trace := sqlDB.QueryStr(t, "SELECT message FROM [SHOW TRACE FOR SESSION]") + // Inspect the trace to ensure that the query was, indeed, initially run + // as distributed but hit a retryable error and was rerun as local. + var foundDistributed, foundLocal bool + for _, message := range trace { + if strings.Contains(message[0], "creating DistSQL plan with isLocal=false") { + foundDistributed = true + } else if strings.Contains(message[0], "encountered an error when running the distributed plan, re-running it as local") { + foundLocal = true + } + } + if injectError { + if !foundDistributed || !foundLocal { + t.Fatalf("with remote error injection, foundDistributed=%t, foundLocal=%t\ntrace:%s", foundDistributed, foundLocal, trace) + } + } else { + // When no error is injected, the query should succeed right away + // when run in distributed fashion. + if !foundDistributed || foundLocal { + t.Fatalf("without remote error injection, foundDistributed=%t, foundLocal=%t\ntrace:%s", foundDistributed, foundLocal, trace) + } + } + } + + // Now disable the retry mechanism and ensure that when remote error is + // injected, it is returned as the query result. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.distributed_query_rerun_locally.enabled = false;") + for _, query := range queries[:2] { + nodeID := stmtToNodeIDForError[query] + t.Logf("running %q with error being injected on n%d but local retry disabled", query, nodeID) + _, err := db.Exec(query) + require.NotNil(t, err) + // lib/pq wraps the error, so we cannot use errors.Is() check. + require.True(t, strings.Contains(err.Error(), getError(nodeID).Error())) + } +} diff --git a/pkg/sql/sqltelemetry/exec.go b/pkg/sql/sqltelemetry/exec.go index dd19d4be3b68..0c6a72d93f43 100644 --- a/pkg/sql/sqltelemetry/exec.go +++ b/pkg/sql/sqltelemetry/exec.go @@ -37,3 +37,11 @@ var CascadesLimitReached = telemetry.GetCounterOnce("sql.exec.cascade-limit-reac // HashAggregationDiskSpillingDisabled is to be incremented whenever the disk // spilling of the vectorized hash aggregator is disabled. var HashAggregationDiskSpillingDisabled = telemetry.GetCounterOnce("sql.exec.hash-agg-spilling-disabled") + +// DistributedErrorLocalRetryAttempt is to be incremented whenever a distributed +// query error results in a local rerun. +var DistributedErrorLocalRetryAttempt = telemetry.GetCounterOnce("sql.exec.dist-query-rerun-locally-attempt") + +// DistributedErrorLocalRetrySuccess is to be incremented whenever the local +// rerun - of a distributed query that hit SQL retryable error - succeeded. +var DistributedErrorLocalRetrySuccess = telemetry.GetCounterOnce("sql.exec.dist-query-rerun-locally-success")