diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 8dc33a325faa..20069905df9b 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -228,7 +228,6 @@ ALL_TESTS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", "//pkg/kv/kvserver/kvstorage:kvstorage_test", @@ -1309,8 +1308,6 @@ GO_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", @@ -2777,7 +2774,6 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index dbb3d46cedf0..7879ce03f36d 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -264,7 +264,7 @@ func alterTenantJobCutover( cutoverTime, record.Timestamp) } } - if err := completeStreamIngestion(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { + if err := applyCutoverTime(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { return hlc.Timestamp{}, err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 4755b67c79af..dccdd0d10932 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -37,7 +37,7 @@ type streamIngestManagerImpl struct { func (r *streamIngestManagerImpl) CompleteStreamIngestion( ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp, ) error { - return completeStreamIngestion(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) + return applyCutoverTime(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) } // GetStreamIngestionStats implements streaming.StreamIngestManager interface. diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 22d85985ecf2..11417b83ccc5 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -38,8 +38,9 @@ import ( "github.com/cockroachdb/errors" ) -// completeStreamIngestion terminates the stream as of specified time. -func completeStreamIngestion( +// applyCutoverTime modifies the consumer job record with a cutover time and +// unpauses the job if necessary. +func applyCutoverTime( ctx context.Context, jobRegistry *jobs.Registry, txn isql.Txn, diff --git a/pkg/cli/clisqlshell/BUILD.bazel b/pkg/cli/clisqlshell/BUILD.bazel index f5cceb24007f..2da958ea419b 100644 --- a/pkg/cli/clisqlshell/BUILD.bazel +++ b/pkg/cli/clisqlshell/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "//pkg/util/sysutil", "@com_github_charmbracelet_bubbles//cursor", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_errors//oserror", "@com_github_knz_bubbline//:bubbline", "@com_github_knz_bubbline//computil", "@com_github_knz_bubbline//editline", diff --git a/pkg/cli/clisqlshell/context.go b/pkg/cli/clisqlshell/context.go index a69ab572e63a..4f1662c72981 100644 --- a/pkg/cli/clisqlshell/context.go +++ b/pkg/cli/clisqlshell/context.go @@ -53,6 +53,10 @@ type Context struct { // CockroachDB's own CLI package has a more advanced URL // parser that is used instead. ParseURL URLParser + + // CertsDir is an extra directory to look for client certs in, + // when the \c command is used. + CertsDir string } // internalContext represents the internal configuration state of the diff --git a/pkg/cli/clisqlshell/sql.go b/pkg/cli/clisqlshell/sql.go index a46c68cabf54..9e6d7de47dc9 100644 --- a/pkg/cli/clisqlshell/sql.go +++ b/pkg/cli/clisqlshell/sql.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/sysutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/errors/oserror" "github.com/knz/bubbline/editline" "github.com/knz/bubbline/history" ) @@ -66,9 +67,10 @@ Query Buffer Connection \info display server details including connection strings. - \c, \connect {[DB] [USER] [HOST] [PORT] | [URL]} + \c, \connect {[DB] [USER] [HOST] [PORT] [options] | [URL]} connect to a server or print the current connection URL. - (Omitted values reuse previous parameters. Use '-' to skip a field.) + Omitted values reuse previous parameters. Use '-' to skip a field. + The option "autocerts" attempts to auto-discover TLS client certs. \password [USERNAME] securely change the password for a user @@ -1629,7 +1631,7 @@ func (c *cliState) handleConnect( cmd []string, loopState, errState cliStateEnum, ) (resState cliStateEnum) { if err := c.handleConnectInternal(cmd, false /*omitConnString*/); err != nil { - fmt.Fprintln(c.iCtx.stderr, err) + clierror.OutputError(c.iCtx.stderr, err, true /*showSeverity*/, false /*verbose*/) c.exitErr = err return errState } @@ -1691,10 +1693,21 @@ func (c *cliState) handleConnectInternal(cmd []string, omitConnString bool) erro return err } + var autoCerts bool + // Parse the arguments to \connect: - // it accepts newdb, user, host, port in that order. + // it accepts newdb, user, host, port and options in that order. // Each field can be marked as "-" to reuse the current defaults. switch len(cmd) { + case 5: + if cmd[4] != "-" { + if cmd[4] == "autocerts" { + autoCerts = true + } else { + return errors.Newf(`unknown syntax: \c %s`, strings.Join(cmd, " ")) + } + } + fallthrough case 4: if cmd[3] != "-" { if err := newURL.SetOption("port", cmd[3]); err != nil { @@ -1757,6 +1770,12 @@ func (c *cliState) handleConnectInternal(cmd []string, omitConnString bool) erro newURL.WithAuthn(prevAuthn) } + if autoCerts { + if err := autoFillClientCerts(newURL, currURL, c.sqlCtx.CertsDir); err != nil { + return err + } + } + if err := newURL.Validate(); err != nil { return errors.Wrap(err, "validating the new URL") } @@ -2712,3 +2731,100 @@ func (c *cliState) closeOutputFile() error { c.iCtx.queryOutputBuf = nil return err } + +// autoFillClientCerts tries to discover a TLS client certificate and key +// for use in newURL. This is used from the \connect command with option +// "autocerts". +func autoFillClientCerts(newURL, currURL *pgurl.URL, extraCertsDir string) error { + username := newURL.GetUsername() + // We could use methods from package "certnames" here but we're + // avoiding extra package dependencies for the sake of keeping + // the standalone shell binary (cockroach-sql) small. + desiredKeyFile := "client." + username + ".key" + desiredCertFile := "client." + username + ".crt" + // Try to discover a TLS client certificate and key. + // First we'll try to find them in the directory specified in the shell config. + // This is coming from --certs-dir on the command line (of COCKROACH_CERTS_DIR). + // + // If not found there, we'll try to find the client cert in the + // same directory as the cert in the original URL; and the key in + // the same directory as the key in the original URL (cert and key + // may be in different places). + // + // If the original URL doesn't have a cert, we'll try to find a + // cert in the directory where the CA cert is stored. + + // If all fails, we'll tell the user where we tried to search. + candidateDirs := make(map[string]struct{}) + var newCert, newKey string + if extraCertsDir != "" { + candidateDirs[extraCertsDir] = struct{}{} + if candidateCert := filepath.Join(extraCertsDir, desiredCertFile); fileExists(candidateCert) { + newCert = candidateCert + } + if candidateKey := filepath.Join(extraCertsDir, desiredKeyFile); fileExists(candidateKey) { + newKey = candidateKey + } + } + if newCert == "" || newKey == "" { + var caCertDir string + if tlsUsed, _, caCertPath := currURL.GetTLSOptions(); tlsUsed { + caCertDir = filepath.Dir(caCertPath) + candidateDirs[caCertDir] = struct{}{} + } + var prevCertDir, prevKeyDir string + if authnCertEnabled, certPath, keyPath := currURL.GetAuthnCert(); authnCertEnabled { + prevCertDir = filepath.Dir(certPath) + prevKeyDir = filepath.Dir(keyPath) + candidateDirs[prevCertDir] = struct{}{} + candidateDirs[prevKeyDir] = struct{}{} + } + if newKey == "" { + if candidateKey := filepath.Join(prevKeyDir, desiredKeyFile); fileExists(candidateKey) { + newKey = candidateKey + } else if candidateKey := filepath.Join(caCertDir, desiredKeyFile); fileExists(candidateKey) { + newKey = candidateKey + } + } + if newCert == "" { + if candidateCert := filepath.Join(prevCertDir, desiredCertFile); fileExists(candidateCert) { + newCert = candidateCert + } else if candidateCert := filepath.Join(caCertDir, desiredCertFile); fileExists(candidateCert) { + newCert = candidateCert + } + } + } + if newCert == "" || newKey == "" { + err := errors.Newf("unable to find TLS client cert and key for user %q", username) + if len(candidateDirs) == 0 { + err = errors.WithHint(err, "No candidate directories; try specifying --certs-dir on the command line.") + } else { + sortedDirs := make([]string, 0, len(candidateDirs)) + for dir := range candidateDirs { + sortedDirs = append(sortedDirs, dir) + } + sort.Strings(sortedDirs) + err = errors.WithDetailf(err, "Candidate directories:\n- %s", strings.Join(sortedDirs, "\n- ")) + } + return err + } + + newURL.WithAuthn(pgurl.AuthnClientCert(newCert, newKey)) + + return nil +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + if oserror.IsNotExist(err) { + return false + } + // Stat() returned an error that is not "does not exist". + // This is unexpected, but we'll treat it as if the file does exist. + // The connection will try to use the file, and then fail with a + // more specific error. + return true +} diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index 93b2e8c81f88..db6e95a1fd2f 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -355,6 +355,8 @@ func runDemoInternal( } defer func() { resErr = errors.CombineErrors(resErr, conn.Close()) }() + _, _, certsDir := c.GetSQLCredentials() + sqlCtx.ShellCtx.CertsDir = certsDir sqlCtx.ShellCtx.ParseURL = clienturl.MakeURLParserFn(cmd, cliCtx.clientOpts) if err := extraInit(ctx, conn); err != nil { diff --git a/pkg/cli/interactive_tests/test_connect_cmd.tcl b/pkg/cli/interactive_tests/test_connect_cmd.tcl index 386917527ac5..7f12190d8633 100644 --- a/pkg/cli/interactive_tests/test_connect_cmd.tcl +++ b/pkg/cli/interactive_tests/test_connect_cmd.tcl @@ -110,17 +110,31 @@ eexpect foo@ eexpect "/system>" end_test +start_test "Check that the client-side connect cmd can change users with automatic client cert detection" +send "\\c - root - - autocerts\r" +eexpect "using new connection URL" +eexpect root@ +eexpect "/system>" +end_test + +start_test "Check that the auto-cert feature properly fails if certs were not found" +send "\\c - unknownuser - - autocerts\r" +eexpect "unable to find TLS client cert and key" +eexpect root@ +eexpect "/system>" +end_test + start_test "Check that the client-side connect cmd can detect syntax errors" send "\\c - - - - abc\r" eexpect "unknown syntax" -eexpect foo@ +eexpect root@ eexpect "/system>" end_test start_test "Check that the client-side connect cmd recognizes invalid URLs" send "\\c postgres://root@localhost:26257/defaultdb?sslmode=invalid&sslcert=$certs_dir%2Fclient.root.crt&sslkey=$certs_dir%2Fclient.root.key&sslrootcert=$certs_dir%2Fca.crt\r" eexpect "unrecognized sslmode parameter" -eexpect foo@ +eexpect root@ eexpect "/system>" end_test diff --git a/pkg/cli/sql_shell_cmd.go b/pkg/cli/sql_shell_cmd.go index 5b8ae110a2df..764b0491fc44 100644 --- a/pkg/cli/sql_shell_cmd.go +++ b/pkg/cli/sql_shell_cmd.go @@ -59,6 +59,7 @@ func runTerm(cmd *cobra.Command, args []string) (resErr error) { } defer func() { resErr = errors.CombineErrors(resErr, conn.Close()) }() + sqlCtx.ShellCtx.CertsDir = baseCfg.SSLCertsDir sqlCtx.ShellCtx.ParseURL = clienturl.MakeURLParserFn(cmd, cliCtx.clientOpts) return sqlCtx.Run(context.Background(), conn) } diff --git a/pkg/cmd/cockroach-sql/main.go b/pkg/cmd/cockroach-sql/main.go index 206b71a3d7fb..61b7351652b1 100644 --- a/pkg/cmd/cockroach-sql/main.go +++ b/pkg/cmd/cockroach-sql/main.go @@ -191,6 +191,7 @@ func runSQL(cmd *cobra.Command, args []string) (resErr error) { } defer func() { resErr = errors.CombineErrors(resErr, conn.Close()) }() + cfg.ShellCtx.CertsDir = copts.CertsDir cfg.ShellCtx.ParseURL = clienturl.MakeURLParserFn(cmd, copts) return cfg.Run(context.Background(), conn) } diff --git a/pkg/cmd/generate-logictest/templates.go b/pkg/cmd/generate-logictest/templates.go index e1e6d150b5cf..2178003399ca 100644 --- a/pkg/cmd/generate-logictest/templates.go +++ b/pkg/cmd/generate-logictest/templates.go @@ -257,7 +257,10 @@ go_test( size = "enormous", srcs = ["generated_test.go"],{{ if .SqliteLogicTest }} args = ["-test.timeout=7195s"],{{ else }} - args = ["-test.timeout=3595s"],{{ end }} + args = select({ + "//build/toolchains:use_ci_timeouts": ["-test.timeout=895s"], + "//conditions:default": ["-test.timeout=3595s"], + }),{{ end }} data = [ "//c-deps:libgeos", # keep{{ if .SqliteLogicTest }} "@com_github_cockroachdb_sqllogictest//:testfiles", # keep{{ end }}{{ if .CockroachGoTestserverTest }} diff --git a/pkg/cmd/roachtest/option/node_list_option.go b/pkg/cmd/roachtest/option/node_list_option.go index 475d78d2c956..5535a7eea2dd 100644 --- a/pkg/cmd/roachtest/option/node_list_option.go +++ b/pkg/cmd/roachtest/option/node_list_option.go @@ -73,6 +73,11 @@ func (n NodeListOption) RandNode() NodeListOption { return NodeListOption{n[rand.Intn(len(n))]} } +// SeededRandNode returns a random node from the NodeListOption using a seeded rand object. +func (n NodeListOption) SeededRandNode(rand *rand.Rand) NodeListOption { + return NodeListOption{n[rand.Intn(len(n))]} +} + // NodeIDsString returns the nodes in the NodeListOption, separated by spaces. func (n NodeListOption) NodeIDsString() string { result := "" diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index f8eea8294209..e87a73bbdf45 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -34,11 +34,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -228,33 +230,33 @@ type replicateKV struct { readPercent int // This field is merely used to debug the c2c framework for finite workloads. - debugRunDurationMinutes int + debugRunDuration time.Duration - // initDuration, if nonzero, will pre-populate the src cluster - initDurationMinutes int + // the number of rows inserted into the cluster before c2c begins + initRows int + + // max size of raw data written during each insertion + maxBlockBytes int } func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { - if kv.initDurationMinutes == 0 { - return "" - } - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`, - kv.initDurationMinutes, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload init kv`). + MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows). + MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes). + Flag("splits", 100). + Option("scatter"). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { - debugDuration := "" - if kv.debugRunDurationMinutes != 0 { - debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes) - } - // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`, - debugDuration, - kv.readPercent, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload run kv`). + Option("tolerate-errors"). + Flag("max-block-bytes", kv.maxBlockBytes). + Flag("read-percent", kv.readPercent). + MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) runDriver( @@ -338,13 +340,17 @@ type replicationDriver struct { t test.Test c cluster.Cluster metrics *c2cMetrics + rng *rand.Rand } func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver { + rng, seed := randutil.NewTestRand() + t.L().Printf(`Random Seed is %d`, seed) return replicationDriver{ - t: t, - c: c, - rs: rs, + t: t, + c: c, + rs: rs, + rng: rng, } } @@ -367,8 +373,8 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) - srcNode := srcCluster.RandNode() - destNode := dstCluster.RandNode() + srcNode := srcCluster.SeededRandNode(rd.rng) + destNode := dstCluster.SeededRandNode(rd.rng) addr, err := c.ExternalPGUrl(ctx, t.L(), srcNode, "") require.NoError(t, err) @@ -428,7 +434,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste WithNodeExporter(rd.setup.dst.nodes.InstallNodes()). WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg)) + // StartGrafana clutters the test.log. Try logging setup to a separate file. + promLog, err := rd.t.L().ChildLogger("prom_setup", logger.QuietStderr, logger.QuietStdout) + if err != nil { + promLog = rd.t.L() + } + require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg)) rd.t.L().Printf("Prom has started") } } @@ -527,13 +538,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time { return retainedTime } -func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { +func (rd *replicationDriver) stopReplicationStream( + ctx context.Context, ingestionJob int, cutoverTime time.Time, +) { rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime) err := retry.ForDuration(time.Minute*5, func() error { var status string var payloadBytes []byte - rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, - ingestionJob).Scan(&status, &payloadBytes) + res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob) + if res.Err() != nil { + // This query can fail if a node shuts down during the query execution; + // therefore, tolerate errors. + return res.Err() + } + require.NoError(rd.t, res.Scan(&status, &payloadBytes)) if jobs.Status(status) == jobs.StatusFailed { payload := &jobspb.Payload{} if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { @@ -593,6 +611,13 @@ func (rd *replicationDriver) main(ctx context.Context) { metricSnapper := rd.startStatsCollection(ctx) rd.preStreamingWorkload(ctx) + // Wait for initial workload to be properly replicated across the source cluster to increase + // the probability that the producer returns a topology with more than one node in it, + // else the node shutdown tests can flake. + if rd.rs.srcNodes >= 3 { + require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db)) + } + rd.t.L().Printf("begin workload on src cluster") m := rd.newMonitor(ctx) // The roachtest driver can use the workloadCtx to cancel the workload. @@ -617,10 +642,12 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status("starting replication stream") rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) ingestionJobID := rd.startReplicationStream(ctx) - removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name) - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false) + // latency verifier queries may error during a node shutdown event; therefore + // tolerate errors if we anticipate node deaths. + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), + getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0) defer lv.maybeLogLatencyHist() m.Go(func(ctx context.Context) error { @@ -655,7 +682,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - rd.stopReplicationStream(ingestionJobID, cutoverTime) + rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime) rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -704,7 +731,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. timeout: 10 * time.Minute, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1}, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, } @@ -751,19 +778,20 @@ func registerClusterToCluster(r registry.Registry) { dstNodes: 3, cpus: 8, pdSize: 100, - workload: replicateKV{readPercent: 0}, + workload: replicateKV{readPercent: 0, maxBlockBytes: 1024}, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, tags: registry.Tags("aws"), }, { - name: "c2c/UnitTest", - srcNodes: 1, - dstNodes: 1, - cpus: 4, - pdSize: 10, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + name: "c2c/UnitTest", + srcNodes: 1, + dstNodes: 1, + cpus: 4, + pdSize: 10, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, + maxBlockBytes: 1024}, timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, @@ -868,7 +896,7 @@ func makeReplResilienceDriver( rd := makeReplicationDriver(t, c, rsp.replicationSpec) return replResilienceDriver{ replicationDriver: rd, - phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)), + phase: c2cPhase(rd.rng.Intn(int(phaseCutover) + 1)), rsp: rsp, } } @@ -923,7 +951,7 @@ func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) { findAnotherNode := func(notThisNode int) int { for { - anotherNode := nodes.RandNode()[0] + anotherNode := nodes.SeededRandNode(rrd.rng)[0] if notThisNode != anotherNode { return anotherNode } @@ -960,12 +988,7 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { case currentPhase < rrd.phase: time.Sleep(5 * time.Second) case currentPhase == rrd.phase: - // Every C2C phase should last at least 30 seconds, so introduce a little - // bit of random waiting before node shutdown to ensure the shutdown occurs - // once we're settled into the target phase. - randomSleep := time.Duration(rand.Intn(6)) - rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep) - time.Sleep(randomSleep * time.Second) + rrd.t.L().Printf("In target phase %s", currentPhase.String()) return nil default: return errors.New("c2c job past target phase") @@ -973,6 +996,16 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { } } +func (rrd *replResilienceDriver) sleepBeforeResiliencyEvent() { + // Assuming every C2C phase lasts at least 10 seconds, introduce some waiting + // before a resiliency event (e.g. a node shutdown) to ensure the event occurs + // once we're fully settled into the target phase (e.g. the stream ingestion + // processors have observed the cutover signal). + randomSleep := time.Duration(5+rrd.rng.Intn(6)) * time.Second + rrd.t.L().Printf("Take a %s power nap", randomSleep) + time.Sleep(randomSleep) +} + func registerClusterReplicationResilience(r registry.Registry) { for _, rsp := range []replResilienceSpec{ { @@ -999,7 +1032,7 @@ func registerClusterReplicationResilience(r registry.Registry) { srcNodes: 4, dstNodes: 4, cpus: 8, - workload: replicateKV{readPercent: 0, initDurationMinutes: 2}, + workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024}, timeout: 20 * time.Minute, additionalDuration: 6 * time.Minute, cutover: 3 * time.Minute, @@ -1054,6 +1087,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // Don't begin shutdown process until c2c job is set up. <-shutdownSetupDone + // Eagerly listen to cutover signal to exercise node shutdown during actual cutover. + rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`) + // While executing a node shutdown on either the src or destination // cluster, ensure the destination cluster's stream ingestion job // completes. If the stream producer job fails, no big deal-- in a real @@ -1061,7 +1097,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // successful c2c replication execution. shutdownStarter := func() jobStarter { return func(c cluster.Cluster, t test.Test) (string, error) { - return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase() + require.NoError(t, rrd.waitForTargetPhase()) + rrd.sleepBeforeResiliencyEvent() + return fmt.Sprintf("%d", rrd.dstJobID), nil } } destinationWatcherNode := rrd.watcherNode diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 2ca12644bcbe..7c5df2dc16a3 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -338,6 +338,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName) removeTenantRateLimiters(t, sysSQL, tenantName) diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index ec784c1e1cd7..49910b2790d5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -483,7 +483,7 @@ package kvflowcontrol // still queued after ~100ms, will trigger epoch-LIFO everywhere. // [^11]: See the implementation for kvflowcontrol.Dispatch. // [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries. -// [^13]: See kvflowsequencer.Sequencer and its use in kvflowhandle.Handle. +// [^13]: See admission.sequencer and its use in admission.StoreWorkQueue. // [^14]: See the high_create_time_low_position_different_range test case for // TestReplicatedWriteAdmission. // diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index e55cb2acc4c7..de6641205a58 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -98,13 +98,11 @@ type Handle interface { // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it // to end up in, log positions that monotonically increase. Requests are - // assumed to have been Admit()-ed first. The returned time.Time parameter - // is to be used as the work item's CreateTime when enqueueing in IO - // admission queues. + // assumed to have been Admit()-ed first. DeductTokensFor( - context.Context, admissionpb.WorkPriority, time.Time, + context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens, - ) time.Time + ) // ReturnTokensUpto returns all previously deducted tokens of a given // priority for all log positions less than or equal to the one specified. // It does for the specific stream. Once returned, subsequent attempts to diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index 46c8e6b0cfb5..a18472414715 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -183,8 +183,9 @@ func (c *Controller) Admit( } } - // TODO(irfansharif): Use the create time for ordering among waiting - // requests. Integrate it with epoch-LIFO. + // TODO(irfansharif): Use CreateTime for ordering among waiting + // requests, integrate it with epoch-LIFO. See I12 from + // kvflowcontrol/doc.go. } // DeductTokens is part of the kvflowcontrol.Controller interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 5caf3f67343a..6610f902ba15 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -31,11 +31,6 @@ func TestDispatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var dispatch *Dispatch datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { @@ -82,7 +77,7 @@ func TestDispatch(t *testing.T) { case strings.HasPrefix(parts[i], "pri="): // Parse pri=. - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) entries.AdmissionPriority = int32(pri) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel index ff07b3cc5958..0b4379c2a72b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/base", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", @@ -40,7 +39,6 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", - "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 1c43ffeb2ab7..0ff5ebf18a10 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -41,7 +40,6 @@ type Handle struct { // (identified by their log positions) have been admitted below-raft, // streams disconnect, or the handle closed entirely. perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker - sequencer *kvflowsequencer.Sequencer closed bool } } @@ -54,7 +52,6 @@ func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock clock: clock, } h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{} - h.mu.sequencer = kvflowsequencer.New() return h } @@ -104,31 +101,28 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim func (h *Handle) DeductTokensFor( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) time.Time { +) { if h == nil { // TODO(irfansharif): See TODO around nil receiver check in Admit(). - return ct + return } - ct, _ = h.deductTokensForInner(ctx, pri, ct, pos, tokens) - return ct + _ = h.deductTokensForInner(ctx, pri, pos, tokens) } func (h *Handle) deductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (sequence time.Time, streams []kvflowcontrol.Stream) { +) (streams []kvflowcontrol.Stream) { h.mu.Lock() defer h.mu.Unlock() if h.mu.closed { log.Errorf(ctx, "operating on a closed handle") - return ct, nil // unused return value in production code + return nil // unused return value in production code } for _, c := range h.mu.connections { @@ -136,7 +130,7 @@ func (h *Handle) deductTokensForInner( h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos) streams = append(streams, c.Stream()) } - return h.mu.sequencer.Sequence(ct), streams + return streams } // ReturnTokensUpto is part of the kvflowcontrol.Handle interface. @@ -322,9 +316,8 @@ func (h *Handle) TestingNonBlockingAdmit( func (h *Handle) TestingDeductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (time.Time, []kvflowcontrol.Stream) { - return h.deductTokensForInner(ctx, pri, ct, pos, tokens) +) []kvflowcontrol.Stream { + return h.deductTokensForInner(ctx, pri, pos, tokens) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go index 72f5d7e05724..a4f1182ac19f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -80,7 +79,7 @@ func TestHandleAdmit(t *testing.T) { // Connect a single stream at pos=0 and deplete all 16MiB of regular // tokens at pos=1. handle.ConnectStream(ctx, pos(0), stream) - handle.DeductTokensFor(ctx, admissionpb.NormalPri, time.Time{}, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) + handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) // Invoke .Admit() in a separate goroutine, and test below whether // the goroutine is blocked. @@ -106,67 +105,3 @@ func TestHandleAdmit(t *testing.T) { }) } } - -// TestHandleSequencing tests the sequencing behavior of -// Handle.DeductTokensFor(), namely that we: -// - advance sequencing timestamps when the create-time advances; -// - advance sequencing timestamps when the log position advances. -func TestHandleSequencing(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // tzero represents the t=0, the earliest possible time. All other - // create-time= is relative to this time. - var tzero = timeutil.Unix(0, 0) - - ctx := context.Background() - stream := kvflowcontrol.Stream{ - TenantID: roachpb.MustMakeTenantID(42), - StoreID: roachpb.StoreID(42), - } - pos := func(t, i uint64) kvflowcontrolpb.RaftLogPosition { - return kvflowcontrolpb.RaftLogPosition{Term: t, Index: i} - } - ct := func(d int64) time.Time { - return tzero.Add(time.Nanosecond * time.Duration(d)) - } - - const tokens = kvflowcontrol.Tokens(1 << 20 /* MiB */) - const normal = admissionpb.NormalPri - - registry := metric.NewRegistry() - clock := hlc.NewClockForTesting(nil) - controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock) - handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock) - - // Test setup: handle is connected to a single stream at pos=1/0 and has - // deducted 1MiB of regular tokens at pos=1 ct=1. - handle.ConnectStream(ctx, pos(1, 0), stream) - seq0 := handle.DeductTokensFor(ctx, normal, ct(1), pos(1, 1), tokens) - - // If create-time advances, so does the sequencing timestamp. - seq1 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq1, seq0) - - // If stays static, the sequencing timestamp - // still advances. - seq2 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq2, seq1) - - // If the log index advances, so does the sequencing timestamp. - seq3 := handle.DeductTokensFor(ctx, normal, ct(3), pos(1, 2), tokens) - require.Greater(t, seq3, seq2) - - // If the log term advances, so does the sequencing timestamp. - seq4 := handle.DeductTokensFor(ctx, normal, ct(3), pos(2, 2), tokens) - require.Greater(t, seq4, seq3) - - // If both the create-time and log-position advance, so does the sequencing - // timestamp. - seq5 := handle.DeductTokensFor(ctx, normal, ct(1000), pos(4, 20), tokens) - require.Greater(t, seq5, seq4) - - // Verify that the sequencing timestamp is kept close to the maximum - // observed create-time. - require.LessOrEqual(t, seq5.Sub(ct(1000)), time.Nanosecond) -} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel deleted file mode 100644 index 4b7b66091b3e..000000000000 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel +++ /dev/null @@ -1,29 +0,0 @@ -load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "kvflowsequencer", - srcs = ["sequencer.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", - visibility = ["//visibility:public"], - deps = ["//pkg/util/timeutil"], -) - -go_test( - name = "kvflowsequencer_test", - srcs = ["sequencer_test.go"], - args = ["-test.timeout=295s"], - data = glob(["testdata/**"]), - embed = [":kvflowsequencer"], - deps = [ - "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/testutils/datapathutils", - "//pkg/util/leaktest", - "//pkg/util/log", - "//pkg/util/timeutil", - "@com_github_cockroachdb_datadriven//:datadriven", - "@com_github_stretchr_testify//require", - ], -) - -get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go index 6cdc943cfc32..d898c39ce43a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go @@ -630,7 +630,7 @@ func (h *replicaHandle) deductTokens( // Increment the quorum log position -- all token deductions are bound to // incrementing log positions. h.quorumLogPosition.Index += 1 - _, streams := h.handle.TestingDeductTokensForInner(ctx, pri, time.Time{}, h.quorumLogPosition, tokens) + streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens) for _, stream := range streams { h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go index 858bda480797..7793501e0b75 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go @@ -32,19 +32,10 @@ func TestTracker(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var tracker *Tracker - knobs := &kvflowcontrol.TestingKnobs{ - UntrackTokensInterceptor: func(tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) { - - }, - } + knobs := &kvflowcontrol.TestingKnobs{} datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": @@ -73,7 +64,7 @@ func TestTracker(t *testing.T) { switch { case strings.HasPrefix(parts[i], "pri="): var found bool - pri, found = reverseWorkPriorityDict[arg] + pri, found = admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) case strings.HasPrefix(parts[i], "tokens="): @@ -103,7 +94,7 @@ func TestTracker(t *testing.T) { var priStr, logPositionStr string d.ScanArgs(t, "pri", &priStr) d.ScanArgs(t, "up-to-log-position", &logPositionStr) - pri, found := reverseWorkPriorityDict[priStr] + pri, found := admissionpb.TestingReverseWorkPriorityDict[priStr] require.True(t, found) logPosition := parseLogPosition(t, logPositionStr) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 097cb297c42e..72cea1abf667 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -2298,8 +2298,7 @@ func (rpcCtx *Context) grpcDialNodeInternal( // Run the heartbeat; this will block until the connection breaks for // whatever reason. We don't actually have to do anything with the error, // so we ignore it. - err := rpcCtx.runHeartbeat(ctx, conn, target) - log.Infof(ctx, "connection heartbeat loop ended with err: %v", err) + _ = rpcCtx.runHeartbeat(ctx, conn, target) maybeFatal(ctx, rpcCtx.m.Remove(k, conn)) // Context gets canceled on server shutdown, and if that's likely why diff --git a/pkg/server/server.go b/pkg/server/server.go index 1d6b6d279f6f..87498039a2cf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -269,7 +269,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok { admissionOptions.Override(opts) } - gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry) + gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{}) engines, err := cfg.CreateEngines(ctx) if err != nil { diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 3ed968ba72fa..e2dbecf8603b 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "kv_slot_adjuster.go", "pacer.go", "scheduler_latency_listener.go", + "sequencer.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", "testing_knobs.go", @@ -56,6 +57,7 @@ go_test( "io_load_listener_test.go", "replicated_write_admission_test.go", "scheduler_latency_listener_test.go", + "sequencer_test.go", "store_token_estimation_test.go", "tokens_linear_model_test.go", "work_queue_test.go", diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index e59d16927952..5a79d6d358c8 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -178,7 +178,7 @@ type granter interface { // is a possibility that that raced with cancellation. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. returnGrant(count int64) @@ -195,7 +195,7 @@ type granter interface { // work turned out to be an underestimate. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. tookWithoutPermission(count int64) @@ -274,23 +274,33 @@ type granterWithIOTokens interface { // getDiskTokensUsedAndReset returns the disk bandwidth tokens used // since the last such call. getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 - // setAdmittedDoneModelsLocked supplies the models to use when - // storeWriteDone is called, to adjust token consumption. Note that these - // models are not used for token adjustment at admission time -- that is - // handled by StoreWorkQueue and is not in scope of this granter. This - // asymmetry is due to the need to use all the functionality of WorkQueue at - // admission time. See the long explanatory comment at the beginning of - // store_token_estimation.go, regarding token estimation. - setAdmittedDoneModels(l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, - ingestLM tokensLinearModel) + // setLinearModels supplies the models to use when storeWriteDone or + // storeReplicatedWorkAdmittedLocked is called, to adjust token consumption. + // Note that these models are not used for token adjustment at admission + // time -- that is handled by StoreWorkQueue and is not in scope of this + // granter. This asymmetry is due to the need to use all the functionality + // of WorkQueue at admission time. See the long explanatory comment at the + // beginning of store_token_estimation.go, regarding token estimation. + setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel) } -// granterWithStoreWriteDone is used to abstract kvStoreTokenGranter for -// testing. The interface is used by StoreWorkQueue to pass on sizing -// information provided when the work was completed. -type granterWithStoreWriteDone interface { +// granterWithStoreReplicatedWorkAdmitted is used to abstract +// kvStoreTokenGranter for testing. The interface is used by StoreWorkQueue to +// pass on sizing information provided when the work is either done (for legacy, +// above-raft IO admission) or admitted (for below-raft, asynchronous admission +// control. +type granterWithStoreReplicatedWorkAdmitted interface { granter + // storeWriteDone is used by legacy, above-raft IO admission control to + // inform granters of when the write was actually done, post-admission. At + // admit-time we did not have sizing info for these writes, so by + // intercepting these writes at admit time we're able to make any + // outstanding token adjustments in the granter. storeWriteDone(originalTokens int64, doneInfo StoreWorkDoneInfo) (additionalTokens int64) + // storeReplicatedWorkAdmittedLocked is used by below-raft admission control + // to inform granters of work being admitted in order for them to make any + // outstanding token adjustments. It's invoked with the coord.mu held. + storeReplicatedWorkAdmittedLocked(originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo) (additionalTokens int64) } // cpuOverloadIndicator is meant to be an instantaneous indicator of cpu diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index 57dc9911080f..c230a5d11c38 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -53,7 +53,7 @@ func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) { p.Print(s) return } - p.Printf("custom-pri=%d", w) + p.Printf("custom-pri=%d", int8(w)) } // WorkPriorityDict is a mapping of the priorities to a short string name. The @@ -69,6 +69,17 @@ var WorkPriorityDict = map[WorkPriority]string{ HighPri: "high-pri", } +// TestingReverseWorkPriorityDict is the reverse-lookup dictionary for +// WorkPriorityDict, for use in tests. +var TestingReverseWorkPriorityDict map[string]WorkPriority + +func init() { + TestingReverseWorkPriorityDict = make(map[string]WorkPriority) + for k, v := range WorkPriorityDict { + TestingReverseWorkPriorityDict[v] = k + } +} + // WorkClass represents the class of work, which is defined entirely by its // WorkPriority. Namely, everything less than NormalPri is defined to be // "Elastic", while everything above and including NormalPri is considered diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index 85c5561304b5..42e60594419c 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -151,9 +151,8 @@ func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle { return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment } -// TestingNewElasticCPUWithCallback constructs an -// ElascticCPUWorkHandle with a testing override for the behaviour of -// OverLimit(). +// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle +// with a testing override for the behaviour of OverLimit(). func TestingNewElasticCPUHandleWithCallback(cb func() (bool, time.Duration)) *ElasticCPUWorkHandle { h := TestingNewElasticCPUHandle() h.testingOverrideOverLimit = cb diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index edf2b24b28e9..582bc5d6c7d0 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -61,6 +61,7 @@ type StoreGrantCoordinators struct { // api. numStores int pebbleMetricsProvider PebbleMetricsProvider + onLogEntryAdmitted OnLogEntryAdmitted closeCh chan struct{} disableTickerForTesting bool @@ -157,7 +158,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) // This is IO work, so override the usesTokens value. opts.usesTokens = true // TODO(sumeer): add per-store WorkQueue state for debug.zip and db console. - granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + granters := [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ &kvStoreTokenChildGranter{ workClass: admissionpb.RegularWorkClass, parent: kvg, @@ -168,7 +169,17 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) + storeReq := sgc.makeStoreRequesterFunc( + sgc.ambientCtx, + storeID, + granters, + sgc.settings, + sgc.workQueueMetrics, + opts, + nil, /* knobs */ + sgc.onLogEntryAdmitted, + &coord.mu.Mutex, + ) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +347,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex, ) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a @@ -356,13 +368,17 @@ type makeStoreRequesterFunc func( // GrantCoordinators since they are not trying to control CPU usage, so we turn // off grant chaining in those coordinators. func NewGrantCoordinators( - ambientCtx log.AmbientContext, st *cluster.Settings, opts Options, registry *metric.Registry, + ambientCtx log.AmbientContext, + st *cluster.Settings, + opts Options, + registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) GrantCoordinators { metrics := makeGrantCoordinatorMetrics() registry.AddMetricStruct(metrics) return GrantCoordinators{ - Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry), + Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry, onLogEntryAdmitted), Regular: makeRegularGrantCoordinator(ambientCtx, opts, st, metrics, registry), Elastic: makeElasticGrantCoordinator(ambientCtx, st, registry), } @@ -399,6 +415,7 @@ func makeStoresGrantCoordinators( st *cluster.Settings, metrics GrantCoordinatorMetrics, registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) *StoreGrantCoordinators { // These metrics are shared across all stores and broken down by priority for // the common priorities. @@ -417,6 +434,7 @@ func makeStoresGrantCoordinators( makeStoreRequesterFunc: makeStoreRequester, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, workQueueMetrics: storeWorkQueueMetrics, + onLogEntryAdmitted: onLogEntryAdmitted, } return storeCoordinators } diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index 8787020d0886..a117e6141861 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -324,7 +324,7 @@ type kvStoreTokenChildGranter struct { parent *kvStoreTokenGranter } -var _ granterWithStoreWriteDone = &kvStoreTokenChildGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &kvStoreTokenChildGranter{} var _ granter = &kvStoreTokenChildGranter{} // grantKind implements granter. @@ -352,11 +352,23 @@ func (cg *kvStoreTokenChildGranter) continueGrantChain(grantChainID grantChainID // Ignore since grant chains are not used for store tokens. } -// storeWriteDone implements granterWithStoreWriteDone. +// storeWriteDone implements granterWithStoreReplicatedWorkAdmitted. func (cg *kvStoreTokenChildGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { - return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo) + cg.parent.coord.mu.Lock() + defer cg.parent.coord.mu.Unlock() + // NB: the token/metric adjustments we want to make here are the same as we + // want to make through the storeReplicatedWorkAdmittedLocked, so we (ab)use it. + return cg.parent.storeReplicatedWorkAdmittedLocked( + cg.workClass, originalTokens, storeReplicatedWorkAdmittedInfo(doneInfo)) +} + +// storeReplicatedWorkAdmitted implements granterWithStoreReplicatedWorkAdmitted. +func (cg *kvStoreTokenChildGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + return cg.parent.storeReplicatedWorkAdmittedLocked(cg.workClass, originalTokens, admittedInfo) } func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool { @@ -522,7 +534,7 @@ func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkC } // setAdmittedModelsLocked implements granterWithIOTokens. -func (sg *kvStoreTokenGranter) setAdmittedDoneModels( +func (sg *kvStoreTokenGranter) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { sg.coord.mu.Lock() @@ -532,37 +544,21 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModels( sg.ingestLM = ingestLM } -// storeWriteDone implements granterWithStoreWriteDone. -func (sg *kvStoreTokenGranter) storeWriteDone( - wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo, +func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked( + wc admissionpb.WorkClass, originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) { - // Normally, we follow the structure of a foo() method calling into a foo() - // method on the GrantCoordinator, which then calls fooLocked() on the - // kvStoreTokenGranter. For example, returnGrant follows this structure. - // This allows the GrantCoordinator to do two things (a) acquire the mu - // before calling into kvStoreTokenGranter, (b) do side-effects, like - // terminating grant chains and doing more grants after the call into the - // fooLocked() method. - // For storeWriteDone we don't bother with this structure involving the - // GrantCoordinator (which has served us well across various methods and - // various granter implementations), since the decision on when the - // GrantCoordinator should call tryGrantLocked is more complicated. And since this - // storeWriteDone is unique to the kvStoreTokenGranter (and not implemented - // by other granters) this approach seems acceptable. - // Reminder: coord.mu protects the state in the kvStoreTokenGranter. - sg.coord.mu.Lock() exhaustedFunc := func() bool { return sg.coordMu.availableIOTokens <= 0 || (wc == admissionpb.ElasticWorkClass && sg.coordMu.elasticDiskBWTokensAvailable <= 0) } wasExhausted := exhaustedFunc() - actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes) - actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(doneInfo.IngestedBytes) + actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes) + actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(admittedInfo.IngestedBytes) actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens additionalL0TokensNeeded := actualL0Tokens - originalTokens sg.subtractTokensLocked(additionalL0TokensNeeded, false) - actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes) + actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes) additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens if wc == admissionpb.ElasticWorkClass { sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded @@ -574,7 +570,6 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.coord.tryGrantLocked() } } - sg.coord.mu.Unlock() // For multi-tenant fairness accounting, we choose to ignore disk bandwidth // tokens. Ideally, we'd have multiple resource dimensions for the fairness // decisions, but we don't necessarily need something more sophisticated diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index c085950b12ce..9fc6555dfdd9 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" @@ -97,7 +98,7 @@ func TestGranterBasic(t *testing.T) { return req } delayForGrantChainTermination = 0 - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) defer coords.Close() coord = coords.Regular return flushAndReset() @@ -109,8 +110,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + _ OnLogEntryAdmitted, _ *syncutil.Mutex, ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ @@ -148,7 +150,7 @@ func TestGranterBasic(t *testing.T) { kvStoreGranter := coord.granters[KVWork].(*kvStoreTokenGranter) // Use the same model for all 3 kinds of models. tlm := tokensLinearModel{multiplier: 0.5, constant: 50} - kvStoreGranter.setAdmittedDoneModels(tlm, tlm, tlm) + kvStoreGranter.setLinearModels(tlm, tlm, tlm) return flushAndReset() case "set-has-waiting-requests": @@ -232,7 +234,7 @@ func TestGranterBasic(t *testing.T) { var origTokens, writeBytes int d.ScanArgs(t, "orig-tokens", &origTokens) d.ScanArgs(t, "write-bytes", &writeBytes) - requesters[scanWorkKind(t, d)].granter.(granterWithStoreWriteDone).storeWriteDone( + requesters[scanWorkKind(t, d)].granter.(granterWithStoreReplicatedWorkAdmitted).storeWriteDone( int64(origTokens), StoreWorkDoneInfo{WriteBytes: int64(writeBytes)}) coord.testingTryGrant() return flushAndReset() @@ -274,8 +276,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, _ *syncutil.Mutex) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -286,7 +288,7 @@ func TestStoreCoordinators(t *testing.T) { return str }, } - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) // There is only 1 KVWork requester at this point in initialization, for the // Regular GrantCoordinator. require.Equal(t, 1, len(requesters)) diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index f7b65d20ca21..028ad32f2e38 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -416,8 +416,8 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics io.copyAuxEtcFromPerWorkEstimator() requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.kvRequester.setStoreRequestEstimates(requestEstimates) - l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() - io.kvGranter.setAdmittedDoneModels(l0WriteLM, l0IngestLM, ingestLM) + l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtDone() + io.kvGranter.setLinearModels(l0WriteLM, l0IngestLM, ingestLM) if _, overloaded := io.ioThreshold.Score(); overloaded || io.aux.doLogFlush || io.elasticDiskBWTokens != unlimitedTokens { log.Infof(ctx, "IO overload: %s", io.adjustTokensResult) @@ -433,7 +433,7 @@ func (io *ioLoadListener) copyAuxEtcFromPerWorkEstimator() { io.adjustTokensResult.aux.perWorkTokensAux = io.perWorkTokenEstimator.aux requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.adjustTokensResult.requestEstimates = requestEstimates - l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() + l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtDone() io.adjustTokensResult.l0WriteLM = l0WriteLM io.adjustTokensResult.l0IngestLM = l0IngestLM io.adjustTokensResult.ingestLM = ingestLM diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index b10507d9cd7d..2f2bfc0cc425 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -370,7 +370,7 @@ func (g *testGranterWithIOTokens) getDiskTokensUsedAndReset() [admissionpb.NumWo return g.diskBandwidthTokensUsed } -func (g *testGranterWithIOTokens) setAdmittedDoneModels( +func (g *testGranterWithIOTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { fmt.Fprintf(&g.buf, "setAdmittedDoneModelsLocked: l0-write-lm: ") @@ -409,7 +409,7 @@ func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndReset() [admissionpb. return [admissionpb.NumWorkClasses]int64{} } -func (g *testGranterNonNegativeTokens) setAdmittedDoneModels( +func (g *testGranterNonNegativeTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { require.LessOrEqual(g.t, 0.5, l0WriteLM.multiplier) diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go index d25adb9895bb..4d1e368922dd 100644 --- a/pkg/util/admission/replicated_write_admission_test.go +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/datadriven" @@ -112,14 +113,15 @@ func TestReplicatedWriteAdmission(t *testing.T) { printTrimmedBytes(originalTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) }, } + var mockCoordMu syncutil.Mutex storeWorkQueue = makeStoreWorkQueue( log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, knobs, + st, metrics, opts, knobs, &NoopOnLogEntryAdmitted{}, &mockCoordMu, ).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] @@ -137,7 +139,7 @@ func TestReplicatedWriteAdmission(t *testing.T) { // Parse pri=. d.ScanArgs(t, "pri", &arg) - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) // Parse size=. @@ -369,15 +371,6 @@ func printWorkQueue(q *WorkQueue) string { // create-time= is relative to this time. var tzero = timeutil.Unix(0, 0) -var reverseWorkPriorityDict map[string]admissionpb.WorkPriority - -func init() { - reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } -} - type testReplicatedWriteGranter struct { t *testing.T wc admissionpb.WorkClass @@ -387,7 +380,7 @@ type testReplicatedWriteGranter struct { tokens int64 } -var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testReplicatedWriteGranter{} func newTestReplicatedWriteGranter( t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, @@ -445,3 +438,10 @@ func (tg *testReplicatedWriteGranter) storeWriteDone( tg.tokens -= originalTokens return 0 } + +func (tg *testReplicatedWriteGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go b/pkg/util/admission/sequencer.go similarity index 52% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go rename to pkg/util/admission/sequencer.go index 9cc0271f4257..05b389cc9310 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go +++ b/pkg/util/admission/sequencer.go @@ -8,15 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission -import ( - "time" - - "github.com/cockroachdb/cockroach/pkg/util/timeutil" -) - -// Sequencer issues monotonic sequencing timestamps derived from observed +// sequencer issues monotonic sequencing timestamps derived from observed // CreateTimes. This is a purpose-built data structure for replication admission // control where we want to assign each AC-queued work below-raft a "sequence // number" for FIFO ordering within a . We ensure timestamps @@ -27,9 +21,29 @@ import ( // // It's not safe for concurrent access. // +// ---- +// +// Aside: Why not do this CreateTime-generation above raft? This is because these +// sequence numbers are encoded as part of the raft proposal[3], and at +// encode-time, we don't actually know what log position the proposal is going +// to end up in. It's hard to explicitly guarantee that a proposal with +// log-position P1 will get encoded before another with log position P2, where +// P1 < P2. +// +// If we tried to "approximate" CreateTimes at proposal-encode-time, +// approximating log position order, it could result in over-admission. This is +// because of how we return flow tokens -- up to some log index[4], and how use +// these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower +// sequence number/CreateTime, it would get admitted first, and when returning +// flow tokens by log position, in specifying up-to-P2, we'll early return P1's +// flow tokens despite it not being admitted. So we'd over-admit at the sender. +// This is all within a pair. +// // [1]: See I12 from kvflowcontrol/doc.go. -// [2]: See kvflowhandle.Handle. -type Sequencer struct { +// [2]: See kvadmission.AdmitRaftEntry. +// [3]: In kvflowcontrolpb.RaftAdmissionMeta. +// [4]: See kvflowcontrolpb.AdmittedRaftLogEntries. +type sequencer struct { // maxCreateTime ratchets to the highest observed CreateTime. If sequencing // work with lower CreateTimes, we continue generating monotonic sequence // numbers by incrementing it for every such sequencing attempt. Provided @@ -38,18 +52,12 @@ type Sequencer struct { maxCreateTime int64 } -// New returns a new Sequencer. -func New() *Sequencer { - return &Sequencer{} -} - -// Sequence returns a monotonically increasing timestamps derived from the +// sequence returns a monotonically increasing timestamps derived from the // provided CreateTime. -func (s *Sequencer) Sequence(ct time.Time) time.Time { - createTime := ct.UnixNano() +func (s *sequencer) sequence(createTime int64) int64 { if createTime <= s.maxCreateTime { createTime = s.maxCreateTime + 1 } s.maxCreateTime = createTime - return timeutil.FromUnixNanos(createTime) + return createTime } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go b/pkg/util/admission/sequencer_test.go similarity index 64% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go rename to pkg/util/admission/sequencer_test.go index cc97d1a86047..968c47ae2629 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go +++ b/pkg/util/admission/sequencer_test.go @@ -8,16 +8,13 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission import ( "fmt" - "strconv" - "strings" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,13 +27,13 @@ func TestSequencer(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - var sequencer *Sequencer + var seq *sequencer var lastSeqNum int64 datadriven.RunTest(t, datapathutils.TestDataPath(t, "sequencer"), func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - sequencer = New() + seq = &sequencer{} return "" case "sequence": @@ -49,8 +46,8 @@ func TestSequencer(t *testing.T) { // Parse log-position=/. logPosition := parseLogPosition(t, d) - _ = logPosition - sequenceNum := sequencer.Sequence(tzero.Add(dur)).UnixNano() + _ = logPosition // unused + sequenceNum := seq.sequence(tzero.Add(dur).UnixNano()) if lastSeqNum < sequenceNum { movement = " (advanced)" } @@ -67,23 +64,3 @@ func TestSequencer(t *testing.T) { }, ) } - -// tzero represents the t=0, the earliest possible time. All other -// create-time= is relative to this time. -var tzero = timeutil.Unix(0, 0) - -func parseLogPosition(t *testing.T, d *datadriven.TestData) kvflowcontrolpb.RaftLogPosition { - // Parse log-position=/. - var arg string - d.ScanArgs(t, "log-position", &arg) - inner := strings.Split(arg, "/") - require.Len(t, inner, 2) - term, err := strconv.Atoi(inner[0]) - require.NoError(t, err) - index, err := strconv.Atoi(inner[1]) - require.NoError(t, err) - return kvflowcontrolpb.RaftLogPosition{ - Term: uint64(term), - Index: uint64(index), - } -} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index 2e2eca842b8e..ea3b83edb863 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -12,6 +12,12 @@ package admission import "github.com/cockroachdb/pebble" +// TODO(irfansharif): This comment is a bit stale with replication admission +// control where admission is asynchronous. AC is informed of the write when +// it's being physically done, so we know its size then. We don't need upfront +// estimates anymore. The AdmittedWorkDone interface and surrounding types +// (StoreWorkDoneInfo for ex.) are no longer central. +// // The logic in this file deals with token estimation for a store write in two // situations: (a) at admission time, (b) when the admitted work is done. At // (a) we have no information provided about the work size (NB: this choice is @@ -105,7 +111,13 @@ const ingestMultiplierMin = 0.5 const ingestMultiplierMax = 1.5 type storePerWorkTokenEstimator struct { - atAdmissionWorkTokens int64 + atAdmissionWorkTokens int64 + + // TODO(irfansharif): The linear model fitters below are actually not used + // for upfront per-work token estimation. They're used in the granter to + // figure out the rate of tokens to produce. This code organization is + // confusing -- rename the type? + atDoneL0WriteTokensLinearModel tokensLinearModelFitter atDoneL0IngestTokensLinearModel tokensLinearModelFitter // Unlike the models above that model bytes into L0, this model computes all @@ -238,7 +250,7 @@ func (e *storePerWorkTokenEstimator) getStoreRequestEstimatesAtAdmission() store return storeRequestEstimates{writeTokens: e.atAdmissionWorkTokens} } -func (e *storePerWorkTokenEstimator) getModelsAtAdmittedDone() ( +func (e *storePerWorkTokenEstimator) getModelsAtDone() ( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, diff --git a/pkg/util/admission/store_token_estimation_test.go b/pkg/util/admission/store_token_estimation_test.go index b2898f203b46..6f65c3da8894 100644 --- a/pkg/util/admission/store_token_estimation_test.go +++ b/pkg/util/admission/store_token_estimation_test.go @@ -77,7 +77,7 @@ func TestStorePerWorkTokenEstimator(t *testing.T) { admissionStats.statsToIgnore.Bytes += uint64(ignoreIngestedIntoL0) } estimator.updateEstimates(l0Metrics, cumLSMIngestedBytes, admissionStats) - wL0lm, iL0lm, ilm := estimator.getModelsAtAdmittedDone() + wL0lm, iL0lm, ilm := estimator.getModelsAtDone() require.Equal(t, wL0lm, estimator.atDoneL0WriteTokensLinearModel.smoothedLinearModel) require.Equal(t, iL0lm, estimator.atDoneL0IngestTokensLinearModel.smoothedLinearModel) require.Equal(t, ilm, estimator.atDoneIngestTokensLinearModel.smoothedLinearModel) diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range index d36a0455822e..d7a28c2ff8bc 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range @@ -19,7 +19,7 @@ admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-po # request with the lower create time sorts first despite having the higher log # position. Admission work queues order work based entirely on create-times, # and the assignment of monotonic create-times (WRT log positions) happens only -# within a range and by higher-level components -- kvflowcontrol.Handle. +# within a range by the StoreWorkQueue. print ---- physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range new file mode 100644 index 000000000000..3de81d4eb0b4 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range @@ -0,0 +1,52 @@ +# Verify that we ignore create-time based ordering for replicated write +# admission when writes happen within the same range. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5us but with a lower log position. +admit tenant=t1 pri=normal-pri create-time=5us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1us but but higher log position. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower log position sorts first despite having the higher +# create-time. The StoreWorkQueue sequences them by (ab)using the create-time +# parameter to get this log position ordering. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower log position despite the higher original create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness index c00240b461f7..3f0d04dbd18a 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness @@ -7,8 +7,7 @@ init # For two tenants t1 and t2, try to admit two requests of 1B each at # incrementing log positions. We specify create-times in log-position order for -# work within a given range, similar to what we do at the issuing client -# (kvflowcontrol.Handle). +# work within a given range, similar to what we do at the StoreWorkQueue level. admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 ---- [regular] try-get=1B available=0B => insufficient tokens diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer b/pkg/util/admission/testdata/sequencer similarity index 91% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer rename to pkg/util/admission/testdata/sequencer index ea335538f940..97716928fec2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer +++ b/pkg/util/admission/testdata/sequencer @@ -1,7 +1,8 @@ -# Walk through the basics of how the per-handle sequencer works. The -# log-position= parameter is not actually used in the implementation, but in -# typical usage we'd be sequencing work in log position order, and it's -# instructive to understand how sequencing timestamps are generated. +# Walk through the basics of how the below-raft replicated work sequencer +# works. The log-position= parameter is not actually used in the +# implementation, but in typical usage we'd be sequencing work in log position +# order, and it's instructive to understand how sequencing timestamps are +# generated. # # ----------------------------------------------------------------------------- # 1. Observe how the sequence numbers change with changing log positions (and diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 8a97cfa900ca..44c5c0a8cb77 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -156,6 +156,14 @@ var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSettin return nil }).WithPublic() +var rangeSequencerGCThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "admission.replication_control.range_sequencer_gc_threshold", + "the inactive duration for a range sequencer after it's garbage collected", + 5*time.Minute, + settings.NonNegativeDuration, +) + // WorkInfo provides information that is used to order work within an WorkQueue. // The WorkKind is not included as a field since an WorkQueue deals with a // single WorkKind. @@ -625,6 +633,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err info.ReplicatedWorkInfo, info.RequestedCount, info.CreateTime, + false, /* coordMuLocked */ ) } return true, nil @@ -671,6 +680,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } } } + // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { @@ -853,6 +863,7 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { item.replicated, item.requestedCount, item.createTime, + true, /* coordMuLocked */ ) q.metrics.incAdmitted(item.priority) @@ -1664,7 +1675,7 @@ func (m *WorkQueueMetrics) getOrCreate(priority admissionpb.WorkPriority) workQu // necessary to call LoadOrStore here as this could be called concurrently. // It is not called the first Load so that we don't have to unnecessarily // create the metrics. - statPrefix := fmt.Sprintf("%v.%v", m.name, admissionpb.WorkPriorityDict[priority]) + statPrefix := fmt.Sprintf("%v.%v", m.name, priority.String()) val, ok = m.byPriority.LoadOrStore(priority, makeWorkQueueMetricsSingle(statPrefix)) if !ok { m.registry.AddMetricStruct(val) @@ -1758,9 +1769,10 @@ type StoreWriteWorkInfo struct { type StoreWorkQueue struct { storeID roachpb.StoreID q [admissionpb.NumWorkClasses]WorkQueue - // Only calls storeWriteDone. The rest of the interface is used by + // Only calls storeReplicatedWorkAdmittedLocked. The rest of the interface is used by // WorkQueue. - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted + coordMu *syncutil.Mutex mu struct { syncutil.RWMutex // estimates is used to determine how many tokens are deducted at-admit @@ -1773,9 +1785,14 @@ type StoreWorkQueue struct { // and observed L0 growth (which factors in state machine application). stats storeAdmissionStats } - stopCh chan struct{} - timeSource timeutil.TimeSource - settings *cluster.Settings + sequencersMu struct { + syncutil.Mutex + s map[roachpb.RangeID]*sequencer // cleaned up periodically + } + stopCh chan struct{} + timeSource timeutil.TimeSource + settings *cluster.Settings + onLogEntryAdmitted OnLogEntryAdmitted knobs *TestingKnobs } @@ -1824,6 +1841,9 @@ func (q *StoreWorkQueue) Admit( info.RequestedCount = q.mu.estimates.writeTokens q.mu.RUnlock() } + if info.ReplicatedWorkInfo.Enabled { + info.CreateTime = q.sequenceReplicatedWork(info.CreateTime, info.ReplicatedWorkInfo) + } enabled, err := q.q[wc].Admit(ctx, info.WorkInfo) if err != nil { @@ -1868,6 +1888,22 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } +// storeReplicatedWorkAdmittedInfo provides information about the size of +// replicated work once it's admitted (which happens asynchronously from the +// work itself). This lets us use the underlying linear models for L0 +// {writes,ingests} to deduct an appropriate number of tokens from the granter, +// for the admitted work size. +// +// TODO(irfansharif): This post-admission adjustment of tokens is odd -- when +// the replicated work is being enqueued, we already know its size, so we could +// have applied the linear models upfront and determine what the right # of +// tokens to deduct all at once. We're doing it this way because we've written +// the WorkQueue and granter interactions to be very general, but it can be hard +// to follow. See review discussions over at #97599. It's worth noting that +// there isn't really a lag in the adjustment, so it is harmless from an +// operational perspective of admission control. +type storeReplicatedWorkAdmittedInfo StoreWorkDoneInfo + type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, @@ -1875,7 +1911,10 @@ type onAdmittedReplicatedWork interface { rwi ReplicatedWorkInfo, requestedTokens int64, createTime int64, + coordMuLocked bool, ) + + // TODO(irfansharif): This coordMuLocked parameter is gross. } var _ onAdmittedReplicatedWork = &StoreWorkQueue{} @@ -1888,6 +1927,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( rwi ReplicatedWorkInfo, originalTokens int64, createTime int64, // only used in tests + coordMuLocked bool, ) { if !rwi.Enabled { panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") @@ -1896,11 +1936,11 @@ func (q *StoreWorkQueue) admittedReplicatedWork( fn(tenantID, pri, rwi, originalTokens, createTime) } - var storeWorkDoneInfo StoreWorkDoneInfo + var replicatedWorkAdmittedInfo storeReplicatedWorkAdmittedInfo if rwi.Ingested { - storeWorkDoneInfo.IngestedBytes = originalTokens + replicatedWorkAdmittedInfo.IngestedBytes = originalTokens } else { - storeWorkDoneInfo.WriteBytes = originalTokens + replicatedWorkAdmittedInfo.WriteBytes = originalTokens } // We've already used RequestedCount for replicated writes to deduct tokens @@ -1910,19 +1950,55 @@ func (q *StoreWorkQueue) admittedReplicatedWork( // underlying linear models, and we may have under-deducted -- we account // for this below. wc := admissionpb.WorkClassFromPri(pri) - additionalTokensNeeded := q.granters[wc].storeWriteDone(originalTokens, storeWorkDoneInfo) + if !coordMuLocked { + q.coordMu.Lock() + } + additionalTokensNeeded := q.granters[wc].storeReplicatedWorkAdmittedLocked(originalTokens, replicatedWorkAdmittedInfo) + if !coordMuLocked { + q.coordMu.Unlock() + } q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) - // TODO(irfansharif): Dispatch flow token returns here. We want to - // inform (a) the origin node of writes at (b) a given priority, to - // (c) the given range, at (d) the given log position on (e) the - // local store. Part of #95563. + // Inform callers of the entry we just admitted. // - _ = rwi.Origin // (a) - _ = pri // (b) - _ = rwi.RangeID // (c) - _ = rwi.LogPosition // (d) - _ = q.storeID // (e) + // TODO(irfansharif): It's bad that we're extending coord.mu's critical + // section to this callback. We can't prevent it when this is happening via + // WorkQueue.granted since it was called while holding coord.mu. We should + // revisit -- one possibility is to add this to a notification queue and + // have a separate goroutine invoke these callbacks (without holding + // coord.mu). We could directly invoke here too if not holding the lock. + q.onLogEntryAdmitted.AdmittedLogEntry( + rwi.Origin, + pri, + q.storeID, + rwi.RangeID, + rwi.LogPosition, + ) +} + +// OnLogEntryAdmitted is used to observe the specific entries (identified by +// rangeID + log position) that were admitted. Since admission control for log +// entries is asynchronous/non-blocking, this allows callers to do requisite +// post-admission bookkeeping. +type OnLogEntryAdmitted interface { + AdmittedLogEntry( + origin roachpb.NodeID, /* node where the entry originated */ + pri admissionpb.WorkPriority, /* admission priority of the entry */ + storeID roachpb.StoreID, /* store on which the entry was admitted */ + rangeID roachpb.RangeID, /* identifying range for the log entry */ + pos LogPosition, /* log position of the entry that was admitted*/ + ) +} + +// NoopOnLogEntryAdmitted is a no-op implementation of the OnLogEntryAdmitted +// interface. +type NoopOnLogEntryAdmitted struct{} + +var _ OnLogEntryAdmitted = &NoopOnLogEntryAdmitted{} + +func (n *NoopOnLogEntryAdmitted) AdmittedLogEntry( + roachpb.NodeID, admissionpb.WorkPriority, roachpb.StoreID, roachpb.RangeID, LogPosition, +) { } // AdmittedWorkDone indicates to the queue that the admitted work has completed. @@ -2011,11 +2087,13 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, storeID roachpb.StoreID, - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, + coordMu *syncutil.Mutex, ) storeRequester { if knobs == nil { knobs = &TestingKnobs{} @@ -2024,12 +2102,14 @@ func makeStoreWorkQueue( opts.timeSource = timeutil.DefaultTimeSource{} } q := &StoreWorkQueue{ - storeID: storeID, - granters: granters, - knobs: knobs, - stopCh: make(chan struct{}), - timeSource: opts.timeSource, - settings: settings, + coordMu: coordMu, + storeID: storeID, + granters: granters, + knobs: knobs, + stopCh: make(chan struct{}), + timeSource: opts.timeSource, + settings: settings, + onLogEntryAdmitted: onLogEntryAdmitted, } opts.usesAsyncAdmit = true @@ -2042,5 +2122,43 @@ func makeStoreWorkQueue( q.mu.estimates = storeRequestEstimates{ writeTokens: 1, } + + q.sequencersMu.s = make(map[roachpb.RangeID]*sequencer) + go func() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + q.gcSequencers() + case <-q.stopCh: + return + } + } + }() return q } + +func (q *StoreWorkQueue) gcSequencers() { + q.sequencersMu.Lock() + defer q.sequencersMu.Unlock() + + for rangeID, seq := range q.sequencersMu.s { + maxCreateTime := timeutil.FromUnixNanos(seq.maxCreateTime) + if q.timeSource.Now().Sub(maxCreateTime) > rangeSequencerGCThreshold.Get(&q.settings.SV) { + delete(q.sequencersMu.s, rangeID) + } + } +} + +func (q *StoreWorkQueue) sequenceReplicatedWork(createTime int64, info ReplicatedWorkInfo) int64 { + q.sequencersMu.Lock() + seq, ok := q.sequencersMu.s[info.RangeID] + if !ok { + seq = &sequencer{} + q.sequencersMu.s[info.RangeID] = seq + } + q.sequencersMu.Unlock() + // We're assuming sequenceReplicatedWork is never invoked concurrently for a + // given RangeID. + return seq.sequence(createTime) +} diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index f4902930be32..66da1387e973 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -67,7 +67,7 @@ type testGranter struct { additionalTokens int64 } -var _ granterWithStoreWriteDone = &testGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testGranter{} func (tg *testGranter) grantKind() grantKind { return tg.gk @@ -110,6 +110,14 @@ func (tg *testGranter) storeWriteDone( return tg.additionalTokens } +func (tg *testGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.buf.printf("storeReplicatedWorkAdmittedLocked%s: originalTokens %d, admittedBytes(write %d,ingested %d) returning %d", + tg.name, originalTokens, admittedInfo.WriteBytes, admittedInfo.IngestedBytes, tg.additionalTokens) + return tg.additionalTokens +} + type testWork struct { tenantID roachpb.TenantID cancel context.CancelFunc @@ -522,9 +530,13 @@ func TestStoreWorkQueueBasic(t *testing.T) { opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() + var mockCoordMu syncutil.Mutex q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, nil /* testing knobs */, &NoopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 84007fba2733..3eec400c98c0 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -88,6 +88,7 @@ type kv struct { zipfian bool sfuDelay time.Duration splits int + scatter bool secondaryIndex bool shards int targetCompressionRatio float64 @@ -127,6 +128,7 @@ var kvMeta = workload.Meta{ `span-limit`: {RuntimeOnly: true}, `del-percent`: {RuntimeOnly: true}, `splits`: {RuntimeOnly: true}, + `scatter`: {RuntimeOnly: true}, `timeout`: {RuntimeOnly: true}, } g.flags.IntVar(&g.batchSize, `batch`, 1, @@ -157,6 +159,8 @@ var kvMeta = workload.Meta{ `previous --sequential run and R implies a previous random run.`) g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations.`) + g.flags.BoolVar(&g.scatter, `scatter`, false, + `Scatter ranges before starting normal operations.`) g.flags.BoolVar(&g.secondaryIndex, `secondary-index`, false, `Add a secondary index to the schema.`) g.flags.IntVar(&g.shards, `num-shards`, 0, @@ -189,13 +193,20 @@ func (w *kv) Flags() workload.Flags { return w.flags } func (w *kv) Hooks() workload.Hooks { return workload.Hooks{ PostLoad: func(_ context.Context, db *gosql.DB) error { - if !w.enum { - return nil - } - _, err := db.Exec(` + if w.enum { + _, err := db.Exec(` CREATE TYPE enum_type AS ENUM ('v'); ALTER TABLE kv ADD COLUMN e enum_type NOT NULL AS ('v') STORED;`) - return err + if err != nil { + return err + } + } + if w.scatter { + if _, err := db.Exec(`ALTER TABLE kv SCATTER`); err != nil { + return err + } + } + return nil }, Validate: w.validateConfig, } diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index 16d04725e7ee..5bbff6266a08 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -214,7 +214,7 @@ func NewMultiConnPool( } if err := m.WarmupConns(ctx, cfg.WarmupConns); err != nil { - return nil, err + log.Warningf(ctx, "warming up connection pool failed (%v), continuing workload", err) } return m, nil @@ -252,18 +252,15 @@ func (m *MultiConnPool) Method() pgx.QueryExecMode { return m.method } -// WarmupConns warms up numConns connections across all pools contained within -// MultiConnPool. The max number of connections are warmed up if numConns is -// set to 0. -func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { - if numConns < 0 { +// WarmupConns warms up totalNumConns connections distributed across all pools +// contained within MultiConnPool. The max number of connections are warmed up +// if totalNumConns is set to 0. If totalNumConns is less than 0, no +// pre-warming of connections is performed. +func (m *MultiConnPool) WarmupConns(ctx context.Context, totalNumConns int) error { + if totalNumConns < 0 { return nil } - // NOTE(seanc@): see context cancellation note below. - warmupCtx, cancel := context.WithCancel(ctx) - defer cancel() - // "Warm up" the pools so we don't have to establish connections later (which // would affect the observed latencies of the first requests, especially when // prepared statements are used). We do this by @@ -279,32 +276,57 @@ func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { // (128). g.SetLimit(100) - var warmupConnsPerPool []int - if numConns == 0 { - warmupConnsPerPool = make([]int, len(m.Pools)) - for i, p := range m.Pools { - warmupConnsPerPool[i] = int(p.Config().MaxConns) + type warmupPool struct { + maxConns int + pool *pgxpool.Pool + } + + warmupPools := make([]warmupPool, len(m.Pools)) + var numWarmupConns int + numConnsPerPool := distribute(totalNumConns, len(m.Pools)) + for i, p := range m.Pools { + poolMaxConns := int(p.Config().MaxConns) + + // Tune max conns for the pool + switch { + case totalNumConns == 0 && poolMaxConns > 0: + warmupPools[i].maxConns = poolMaxConns + case totalNumConns == 0: + warmupPools[i].maxConns = 1 // always at least one connection + default: + warmupPools[i].maxConns = numConnsPerPool[i] } - } else { - warmupConnsPerPool = distribute(numConns, len(m.Pools)) - for i, p := range m.Pools { - poolMaxConns := int(p.Config().MaxConns) - if warmupConnsPerPool[i] > poolMaxConns { - warmupConnsPerPool[i] = poolMaxConns - } + + // Clamp max conns per pool + if warmupPools[i].maxConns > poolMaxConns { + warmupPools[i].maxConns = poolMaxConns } + + warmupPools[i].pool = p + numWarmupConns += warmupPools[i].maxConns } - var numWarmupConns int - for _, n := range warmupConnsPerPool { - numWarmupConns += n + // NOTE(seanc@): see context cancellation note below. + // TODO(seanc@): Change WithTimeout() back to WithCancel() + const maxWarmupTime = 5 * time.Minute // NOTE(seanc@): 5min == AWS NLB TCP idle time + const minWarmupTime = 15 * time.Second + const maxTimePerConn = 200 * time.Millisecond + warmupTime := minWarmupTime + if int(warmupTime) < numWarmupConns*int(maxTimePerConn) { + warmupTime = time.Duration(numWarmupConns * int(maxTimePerConn)) } + if warmupTime > maxWarmupTime { + warmupTime = maxWarmupTime + } + ctx, cancel := context.WithTimeout(ctx, warmupTime) + defer cancel() + warmupConns := make(chan *pgxpool.Conn, numWarmupConns) - for i, p := range m.Pools { + for _, p := range warmupPools { p := p - for k := 0; k < warmupConnsPerPool[i]; k++ { + for i := 0; i < p.maxConns; i++ { g.Go(func() error { - conn, err := p.Acquire(warmupCtx) + conn, err := p.pool.Acquire(ctx) if err != nil { return err } @@ -317,23 +339,16 @@ func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { estConns := make([]*pgxpool.Conn, 0, numWarmupConns) defer func() { for _, conn := range estConns { - // NOTE(seanc@): Release() connections before canceling the warmupCtx to - // prevent partially established connections from being Acquire()'ed. conn.Release() } }() -WARMUP: for i := 0; i < numWarmupConns; i++ { select { case conn := <-warmupConns: estConns = append(estConns, conn) - case <-warmupCtx.Done(): - if err := warmupCtx.Err(); err != nil { - return err - } - - break WARMUP + case <-ctx.Done(): + return ctx.Err() } }