From 416433a139ed8b1f6b38b98dc71df6481d2de419 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sat, 20 Mar 2021 15:28:24 +0100 Subject: [PATCH 01/11] cli/demo: label the URLs consistently with 'start' Before: ``` Connection parameters: (console) http://127.0.0.1:8080/demologin?password=demo63578&username=demo (sql) postgres://demo:demo63578@?host=%2Ftmp%2Fdemo916006859&port=26257 (sql/tcp) postgres://demo:demo63578@127.0.0.1:26257?sslmode=require ``` After: ``` Connection parameters: (webui) http://127.0.0.1:8080/demologin?password=demo66299&username=demo (sql) postgres://demo:demo66299@127.0.0.1:26257?sslmode=require (sql/unix) postgres://demo:demo66299@?host=%2Ftmp%2Fdemo869742428&port=26257 ``` Release note (cli change): The prefixes displayed before connection URLs when `cockroach demo` starts up have been updated to better align with the output of `cockroach start`. --- pkg/cli/demo_cluster.go | 16 ++++---- pkg/cli/interactive_tests/test_demo.tcl | 54 ++++++++++++------------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/pkg/cli/demo_cluster.go b/pkg/cli/demo_cluster.go index eb120c00fd46..30af91e6448e 100644 --- a/pkg/cli/demo_cluster.go +++ b/pkg/cli/demo_cluster.go @@ -882,7 +882,7 @@ func (c *transientCluster) listDemoNodes(w io.Writer, justOne bool) { } serverURL := s.Cfg.AdminURL() if !demoCtx.insecure { - // Print node ID and console URL. Embed the autologin feature inside the URL. + // Print node ID and web UI URL. Embed the autologin feature inside the URL. // We avoid printing those when insecure, as the autologin path is not available // in that case. pwauth := url.Values{ @@ -892,18 +892,18 @@ func (c *transientCluster) listDemoNodes(w io.Writer, justOne bool) { serverURL.Path = server.DemoLoginPath serverURL.RawQuery = pwauth.Encode() } - fmt.Fprintf(w, " (console) %s\n", serverURL) - // Print unix socket if defined. - if c.useSockets { - sock := c.sockForServer(nodeID) - fmt.Fprintln(w, " (sql) ", sock) - } + fmt.Fprintln(w, " (webui) ", serverURL) // Print network URL if defined. netURL, err := c.getNetworkURLForServer(i, nil, false /*includeAppName*/) if err != nil { fmt.Fprintln(stderr, errors.Wrap(err, "retrieving network URL")) } else { - fmt.Fprintln(w, " (sql/tcp)", netURL) + fmt.Fprintln(w, " (sql) ", netURL) + } + // Print unix socket if defined. + if c.useSockets { + sock := c.sockForServer(nodeID) + fmt.Fprintln(w, " (sql/unix)", sock) } fmt.Fprintln(w) } diff --git a/pkg/cli/interactive_tests/test_demo.tcl b/pkg/cli/interactive_tests/test_demo.tcl index 417187c933a8..c2fcb00e6394 100644 --- a/pkg/cli/interactive_tests/test_demo.tcl +++ b/pkg/cli/interactive_tests/test_demo.tcl @@ -9,7 +9,7 @@ eexpect "Welcome" # Warn the user that they won't get persistence. eexpect "your changes to data stored in the demo session will not be saved!" # Inform the necessary URL. -eexpect "(console)" +eexpect "(webui)" eexpect "http:" # Ensure same messages as cockroach sql. eexpect "Server version" @@ -34,16 +34,16 @@ eexpect "defaultdb>" # Show the URLs. # Also check that the default port is used. send "\\demo ls\r" -eexpect "(console)" +eexpect "(webui)" eexpect "http://" eexpect ":8080" eexpect "(sql)" -eexpect "root:unused@" -eexpect "=26257" -eexpect "(sql/tcp)" eexpect "root@" eexpect ":26257" eexpect "sslmode=disable" +eexpect "(sql/unix)" +eexpect "root:unused@" +eexpect "=26257" eexpect "defaultdb>" interrupt @@ -57,13 +57,13 @@ eexpect "defaultdb>" # Show the URLs. send "\\demo ls\r" -eexpect "(console)" +eexpect "(webui)" eexpect "http://" eexpect "(sql)" -eexpect "root:unused@" -eexpect "(sql/tcp)" eexpect "root@" eexpect "sslmode=disable" +eexpect "(sql/unix)" +eexpect "root:unused@" eexpect "defaultdb>" interrupt @@ -85,16 +85,16 @@ eexpect "defaultdb>" # Show the URLs. # Also check that the default port is used. send "\\demo ls\r" -eexpect "(console)" +eexpect "(webui)" eexpect "http://" eexpect ":8080" eexpect "(sql)" eexpect "demo:" -eexpect "=26257" -eexpect "(sql/tcp)" -eexpect "demo:" eexpect ":26257" eexpect "sslmode=require" +eexpect "(sql/unix)" +eexpect "demo:" +eexpect "=26257" eexpect "defaultdb>" interrupt @@ -110,13 +110,13 @@ eexpect "defaultdb>" # Show the URLs. send "\\demo ls\r" -eexpect "(console)" +eexpect "(webui)" eexpect "http://" eexpect "(sql)" eexpect "demo:" -eexpect "(sql/tcp)" -eexpect "demo:" eexpect "sslmode=require" +eexpect "(sql/unix)" +eexpect "demo:" eexpect "defaultdb>" interrupt @@ -130,7 +130,7 @@ spawn $argv demo --insecure --empty # Check that we see our message. eexpect "Connection parameters" eexpect "(sql)" -eexpect "(sql/tcp)" +eexpect "(sql/unix)" expect root@ send_eof eexpect eof @@ -141,19 +141,19 @@ spawn $argv demo --insecure --nodes 3 --empty # Check that we get a message for each node. eexpect "Connection parameters" eexpect "(sql)" -eexpect "(sql/tcp)" +eexpect "(sql/unix)" eexpect "defaultdb>" send "\\demo ls\r" eexpect "node 1" eexpect "(sql)" -eexpect "(sql/tcp)" +eexpect "(sql/unix)" eexpect "node 2" eexpect "(sql)" -eexpect "(sql/tcp)" +eexpect "(sql/unix)" eexpect "node 3" eexpect "(sql)" -eexpect "(sql/tcp)" +eexpect "(sql/unix)" eexpect "defaultdb>" send_eof @@ -161,7 +161,7 @@ eexpect eof spawn $argv demo --insecure=false --empty # Expect that security related tags are part of the connection URL. -eexpect "(sql/tcp)" +eexpect "(sql)" eexpect "sslmode=require" eexpect "defaultdb>" @@ -196,17 +196,17 @@ eexpect "defaultdb>" # Show the URLs. send "\\demo ls\r" eexpect "(sql)" -eexpect "=23000" -eexpect "(sql/tcp)" eexpect ":23000" +eexpect "(sql/unix)" +eexpect "=23000" eexpect "(sql)" -eexpect "=23001" -eexpect "(sql/tcp)" eexpect ":23001" +eexpect "(sql/unix)" +eexpect "=23001" eexpect "(sql)" -eexpect "=23002" -eexpect "(sql/tcp)" eexpect ":23002" +eexpect "(sql/unix)" +eexpect "=23002" eexpect "defaultdb>" interrupt From 0e4e5667e6daa5c301c3d84c0a799eec3b6675f5 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sat, 20 Mar 2021 15:14:52 +0100 Subject: [PATCH 02/11] cli/demo: `--empty` -> `--no-example-database`; add env var The flag `--empty` was not, in fact, creating an empty cluster; the two base databases `defaultdb` and `postgres` are still created. Therefore the name of the flag was misleading. What the flag is really doing is disable the generation of the example database powered by the selected workload. Release note (cli change): The flag `--empty` for `cockroach demo` has been renamed to `--no-example-database`. The previous form of the flag is still recognized but is marked as deprecated. Additionally, the user can now set the env var `COCKROACH_NO_EXAMPLE_DATABASE` to obtain this behavior automatically in every new demo session. --- pkg/cli/cli_test.go | 4 ++-- pkg/cli/cliflags/flags.go | 12 +++++++++--- pkg/cli/context.go | 4 ++-- pkg/cli/demo.go | 16 ++++++++-------- pkg/cli/flags.go | 4 +++- pkg/cli/interactive_tests/test_demo.tcl | 18 +++++++++--------- pkg/cli/interactive_tests/test_demo_global.tcl | 2 +- pkg/cli/interactive_tests/test_dump_sig.tcl | 2 +- pkg/cli/interactive_tests/test_flags.tcl | 4 ++-- pkg/cli/interactive_tests/test_local_cmds.tcl | 4 ++-- pkg/cli/interactive_tests/test_notice.tcl | 2 +- pkg/cmd/smithtest/main.go | 2 +- 12 files changed, 41 insertions(+), 33 deletions(-) diff --git a/pkg/cli/cli_test.go b/pkg/cli/cli_test.go index e1431bae0ea4..881e6a13d53d 100644 --- a/pkg/cli/cli_test.go +++ b/pkg/cli/cli_test.go @@ -417,7 +417,7 @@ func Example_demo() { testData := [][]string{ {`demo`, `-e`, `show database`}, - {`demo`, `-e`, `show database`, `--empty`}, + {`demo`, `-e`, `show database`, `--no-example-database`}, {`demo`, `-e`, `show application_name`}, {`demo`, `--format=table`, `-e`, `show database`}, {`demo`, `-e`, `select 1 as "1"`, `-e`, `select 3 as "3"`}, @@ -448,7 +448,7 @@ func Example_demo() { // demo -e show database // database // movr - // demo -e show database --empty + // demo -e show database --no-example-database // database // defaultdb // demo -e show application_name diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index 4d870896d3c1..cd2c50acfe16 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -1186,10 +1186,16 @@ If set, disable cockroach demo from attempting to obtain a temporary license.`, } UseEmptyDatabase = FlagInfo{ - Name: "empty", + Name: "empty", + Description: `Deprecated in favor of --no-example-database`, + } + + NoExampleDatabase = FlagInfo{ + Name: "no-example-database", + EnvVar: "COCKROACH_NO_EXAMPLE_DATABASE", Description: ` -Start with an empty database: avoid pre-loading a default dataset in -the demo shell.`, +Disable the creation of a default dataset in the demo shell. +This makes 'cockroach demo' faster to start.`, } GeoLibsDir = FlagInfo{ diff --git a/pkg/cli/context.go b/pkg/cli/context.go index 7c450d3c3ec3..fad6147a4f5a 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -567,7 +567,7 @@ var demoCtx struct { cacheSize int64 disableTelemetry bool disableLicenseAcquisition bool - useEmptyDatabase bool + noExampleDatabase bool runWorkload bool localities demoLocalityList geoPartitionedReplicas bool @@ -585,7 +585,7 @@ func setDemoContextDefaults() { demoCtx.nodes = 1 demoCtx.sqlPoolMemorySize = 128 << 20 // 128MB, chosen to fit 9 nodes on 2GB machine. demoCtx.cacheSize = 64 << 20 // 64MB, chosen to fit 9 nodes on 2GB machine. - demoCtx.useEmptyDatabase = false + demoCtx.noExampleDatabase = false demoCtx.simulateLatency = false demoCtx.runWorkload = false demoCtx.localities = nil diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index 7c07ed7b5dce..ca219890d8c7 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -36,7 +36,7 @@ Start an in-memory, standalone, single-node CockroachDB instance, and open an interactive SQL prompt to it. Various datasets are available to be preloaded as subcommands: e.g. "cockroach demo startrek". See --help for a full list. -By default, the 'movr' dataset is pre-loaded. You can also use --empty +By default, the 'movr' dataset is pre-loaded. You can also use --no-example-database to avoid pre-loading a dataset. cockroach demo attempts to connect to a Cockroach Labs server to obtain a @@ -175,14 +175,14 @@ func incrementTelemetryCounters(cmd *cobra.Command) { func checkDemoConfiguration( cmd *cobra.Command, gen workload.Generator, ) (workload.Generator, error) { - if gen == nil && !demoCtx.useEmptyDatabase { - // Use a default dataset unless prevented by --empty. + if gen == nil && !demoCtx.noExampleDatabase { + // Use a default dataset unless prevented by --no-example-database. gen = defaultGenerator } // Make sure that the user didn't request a workload and an empty database. - if demoCtx.runWorkload && demoCtx.useEmptyDatabase { - return nil, errors.New("cannot run a workload against an empty database") + if demoCtx.runWorkload && demoCtx.noExampleDatabase { + return nil, errors.New("cannot run a workload when generation of the example database is disabled") } // Make sure the number of nodes is valid. @@ -208,9 +208,9 @@ func checkDemoConfiguration( return nil, errors.Newf("enterprise features are needed for this demo (%s)", geoFlag) } - // Make sure that the user didn't request to have a topology and an empty database. - if demoCtx.useEmptyDatabase { - return nil, errors.New("cannot setup geo-partitioned replicas topology on an empty database") + // Make sure that the user didn't request to have a topology and disable the example database. + if demoCtx.noExampleDatabase { + return nil, errors.New("cannot setup geo-partitioned replicas topology without generating an example database") } // Make sure that the Movr database is selected when automatically partitioning. diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index ce13aa0f7745..954d42a5f69e 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -780,7 +780,9 @@ func init() { _ = f.MarkHidden(cliflags.Global.Name) // The --empty flag is only valid for the top level demo command, // so we use the regular flag set. - boolFlag(demoCmd.Flags(), &demoCtx.useEmptyDatabase, cliflags.UseEmptyDatabase) + boolFlag(demoCmd.Flags(), &demoCtx.noExampleDatabase, cliflags.UseEmptyDatabase) + _ = f.MarkDeprecated(cliflags.UseEmptyDatabase.Name, "use --no-workload-database") + boolFlag(demoCmd.Flags(), &demoCtx.noExampleDatabase, cliflags.NoExampleDatabase) // We also support overriding the GEOS library path for 'demo'. // Even though the demoCtx uses mostly different configuration // variables from startCtx, this is one case where we afford diff --git a/pkg/cli/interactive_tests/test_demo.tcl b/pkg/cli/interactive_tests/test_demo.tcl index c2fcb00e6394..c18cab11bc91 100644 --- a/pkg/cli/interactive_tests/test_demo.tcl +++ b/pkg/cli/interactive_tests/test_demo.tcl @@ -27,7 +27,7 @@ start_test "Check that demo insecure says hello properly" # With env var. set ::env(COCKROACH_INSECURE) "true" -spawn $argv demo --empty +spawn $argv demo --no-example-database eexpect "Welcome" eexpect "defaultdb>" @@ -51,7 +51,7 @@ eexpect eof # With command-line override. set ::env(COCKROACH_INSECURE) "false" -spawn $argv demo --insecure=true --empty +spawn $argv demo --insecure=true --no-example-database eexpect "Welcome" eexpect "defaultdb>" @@ -76,7 +76,7 @@ start_test "Check that demo secure says hello properly" # With env var. set ::env(COCKROACH_INSECURE) "false" -spawn $argv demo --empty +spawn $argv demo --no-example-database eexpect "Welcome" eexpect "The user \"demo\" with password" eexpect "has been created." @@ -102,7 +102,7 @@ eexpect eof # With command-line override. set ::env(COCKROACH_INSECURE) "true" -spawn $argv demo --insecure=false --empty +spawn $argv demo --insecure=false --no-example-database eexpect "Welcome" eexpect "The user \"demo\" with password" eexpect "has been created." @@ -126,7 +126,7 @@ end_test # Test that demo displays connection URLs for nodes in the cluster. start_test "Check that node URLs are displayed" -spawn $argv demo --insecure --empty +spawn $argv demo --insecure --no-example-database # Check that we see our message. eexpect "Connection parameters" eexpect "(sql)" @@ -136,7 +136,7 @@ send_eof eexpect eof # Start the test again with a multi node cluster. -spawn $argv demo --insecure --nodes 3 --empty +spawn $argv demo --insecure --nodes 3 --no-example-database # Check that we get a message for each node. eexpect "Connection parameters" @@ -159,7 +159,7 @@ eexpect "defaultdb>" send_eof eexpect eof -spawn $argv demo --insecure=false --empty +spawn $argv demo --insecure=false --no-example-database # Expect that security related tags are part of the connection URL. eexpect "(sql)" eexpect "sslmode=require" @@ -172,7 +172,7 @@ end_test start_test "Check that the port numbers can be overridden from the command line." -spawn $argv demo --empty --nodes 3 --http-port 8000 +spawn $argv demo --no-example-database --nodes 3 --http-port 8000 eexpect "Welcome" eexpect "defaultdb>" @@ -189,7 +189,7 @@ eexpect "defaultdb>" interrupt eexpect eof -spawn $argv demo --empty --nodes 3 --sql-port 23000 +spawn $argv demo --no-example-database --nodes 3 --sql-port 23000 eexpect "Welcome" eexpect "defaultdb>" diff --git a/pkg/cli/interactive_tests/test_demo_global.tcl b/pkg/cli/interactive_tests/test_demo_global.tcl index e8b0c77ec368..d094a035530e 100644 --- a/pkg/cli/interactive_tests/test_demo_global.tcl +++ b/pkg/cli/interactive_tests/test_demo_global.tcl @@ -8,7 +8,7 @@ set timeout 90 start_test "Check --global flag runs as expected" # Start a demo with --global set -spawn $argv demo --empty --nodes 9 --global +spawn $argv demo --no-example-database --nodes 9 --global # Ensure db is defaultdb. eexpect "defaultdb>" diff --git a/pkg/cli/interactive_tests/test_dump_sig.tcl b/pkg/cli/interactive_tests/test_dump_sig.tcl index a2c644e4550d..6327d0b8bec8 100644 --- a/pkg/cli/interactive_tests/test_dump_sig.tcl +++ b/pkg/cli/interactive_tests/test_dump_sig.tcl @@ -25,7 +25,7 @@ eexpect ":/# " end_test start_test "Check that the client also can generate goroutine dumps." -send "$argv demo --empty\r" +send "$argv demo --no-example-database\r" eexpect root@ # Dump goroutines in server. system "killall -QUIT `basename \$(realpath $argv)`" diff --git a/pkg/cli/interactive_tests/test_flags.tcl b/pkg/cli/interactive_tests/test_flags.tcl index bb434f749dc4..31145312165d 100644 --- a/pkg/cli/interactive_tests/test_flags.tcl +++ b/pkg/cli/interactive_tests/test_flags.tcl @@ -80,13 +80,13 @@ eexpect {Failed running "start"} end_test start_test "Check that demo start-up flags are reported to telemetry" -send "$argv demo --empty --echo-sql --logtostderr=WARNING\r" +send "$argv demo --no-example-database --echo-sql --logtostderr=WARNING\r" eexpect "defaultdb>" send "SELECT * FROM crdb_internal.feature_usage WHERE feature_name LIKE 'cli.demo.%' ORDER BY 1;\r" eexpect feature_name eexpect "cli.demo.explicitflags.echo-sql" -eexpect "cli.demo.explicitflags.empty" eexpect "cli.demo.explicitflags.logtostderr" +eexpect "cli.demo.explicitflags.no-example-database" eexpect "cli.demo.runs" eexpect "defaultdb>" interrupt diff --git a/pkg/cli/interactive_tests/test_local_cmds.tcl b/pkg/cli/interactive_tests/test_local_cmds.tcl index 8982afebfea9..f1cb909bfd11 100755 --- a/pkg/cli/interactive_tests/test_local_cmds.tcl +++ b/pkg/cli/interactive_tests/test_local_cmds.tcl @@ -290,7 +290,7 @@ stop_server $argv start_test "Check that client-side options can be overridden with set" # First establish a baseline with all the defaults. -send "$argv demo --empty\r" +send "$argv demo --no-example-database\r" eexpect root@ send "\\set display_format csv\r" send "\\set\r" @@ -305,7 +305,7 @@ interrupt eexpect ":/# " # Then verify that the defaults can be overridden. -send "$argv demo --empty --set=auto_trace=on --set=check_syntax=false --set=echo=true --set=errexit=true --set=prompt1=%n@haa --set=show_times=false\r" +send "$argv demo --no-example-database --set=auto_trace=on --set=check_syntax=false --set=echo=true --set=errexit=true --set=prompt1=%n@haa --set=show_times=false\r" eexpect root@ send "\\set display_format csv\r" send "\\set\r" diff --git a/pkg/cli/interactive_tests/test_notice.tcl b/pkg/cli/interactive_tests/test_notice.tcl index 7951b2acfa02..103832ad193f 100644 --- a/pkg/cli/interactive_tests/test_notice.tcl +++ b/pkg/cli/interactive_tests/test_notice.tcl @@ -4,7 +4,7 @@ source [file join [file dirname $argv0] common.tcl] # This test ensures notices are being sent as expected. -spawn $argv demo --empty +spawn $argv demo --no-example-database eexpect root@ start_test "Test that notices always appear at the end after all results." diff --git a/pkg/cmd/smithtest/main.go b/pkg/cmd/smithtest/main.go index fedaa0686147..cab05b540b5c 100644 --- a/pkg/cmd/smithtest/main.go +++ b/pkg/cmd/smithtest/main.go @@ -260,7 +260,7 @@ func (s WorkerSetup) run(ctx context.Context, rnd *rand.Rand) error { // If we can't ping, check if the statement caused a panic. if err := db.PingContext(ctx); err != nil { input := fmt.Sprintf("%s; %s;", initSQL, stmt) - out, _ := exec.CommandContext(ctx, s.cockroach, "demo", "--empty", "-e", input).CombinedOutput() + out, _ := exec.CommandContext(ctx, s.cockroach, "demo", "--no-example-database", "-e", input).CombinedOutput() var pqerr pq.Error if match := stackRE.FindStringSubmatch(string(out)); match != nil { pqerr.Detail = strings.TrimSpace(match[1]) From 40739328098731d8c660e32bf0be605475e110e2 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 22 Mar 2021 17:18:00 -0400 Subject: [PATCH 03/11] cli: support addr 0 in multi-node demo Before this change, if you specified that the ports should be chosen by the system with `--http-port 0 --sql-port 0` then it would not work for multi-node demo. This fixes that. Release note: None --- pkg/cli/demo_cluster.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/cli/demo_cluster.go b/pkg/cli/demo_cluster.go index 30af91e6448e..970d1d97d50a 100644 --- a/pkg/cli/demo_cluster.go +++ b/pkg/cli/demo_cluster.go @@ -340,7 +340,13 @@ func testServerArgsForTransientCluster( // Unit tests can be run with multiple processes side-by-side with // `make stress`. This is bound to not work with fixed ports. sqlPort := sqlBasePort + int(nodeID) - 1 + if sqlBasePort == 0 { + sqlPort = 0 + } httpPort := httpBasePort + int(nodeID) - 1 + if httpBasePort == 0 { + httpPort = 0 + } args.SQLAddr = fmt.Sprintf(":%d", sqlPort) args.HTTPAddr = fmt.Sprintf(":%d", httpPort) } From 57fba3817c47c9c249f7509bbb078df8144f1df0 Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 26 Mar 2021 18:02:34 -0300 Subject: [PATCH 04/11] cli: fix demo --global latencies Previously, the --global flag to demo had some inconsistent behavior: some of the time, the actual "global" latencies were double what we would expect. The reason for this was that we were accidentally only introducing a delay on one of the sides of the connection, some of the time. The fix was to change a method to use a pointer receiver :) Thanks to @alf_the_n00b for noticing this one-character mistake. Release note (cli change): fix the artificial latencies introduced by the --global flag to demo. --- pkg/cli/demo.go | 21 +++++++++++++++------ pkg/rpc/context.go | 22 +++++++++++++++------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index ca219890d8c7..db6dee152a4a 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -105,14 +105,23 @@ func insertPair(pair regionPair, latency int) { regionToLatency[pair.regionB] = latency } +// Round-trip latencies collected from http://cloudping.co on 2019-09-11. +var regionRoundTripLatencies = map[regionPair]int{ + {regionA: "us-east1", regionB: "us-west1"}: 66, + {regionA: "us-east1", regionB: "europe-west1"}: 64, + {regionA: "us-west1", regionB: "europe-west1"}: 146, +} + +var regionOneWayLatencies = make(map[regionPair]int) + func init() { + // We record one-way latencies next, because the logic in our delayingConn + // and delayingListener is in terms of one-way network delays. + for pair, latency := range regionRoundTripLatencies { + regionOneWayLatencies[pair] = latency / 2 + } regionToRegionToLatency = make(map[string]map[string]int) - // Latencies collected from http://cloudping.co on 2019-09-11. - for pair, latency := range map[regionPair]int{ - {regionA: "us-east1", regionB: "us-west1"}: 66, - {regionA: "us-east1", regionB: "europe-west1"}: 64, - {regionA: "us-west1", regionB: "europe-west1"}: 146, - } { + for pair, latency := range regionOneWayLatencies { insertPair(pair, latency) insertPair(regionPair{ regionA: pair.regionB, diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 0e5f0f81d7c9..80cf8875e751 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -838,7 +838,7 @@ func (ald *artificialLatencyDialer) dial(ctx context.Context, addr string) (net. if err != nil { return conn, err } - return delayingConn{ + return &delayingConn{ Conn: conn, latency: time.Duration(ald.latencyMS) * time.Millisecond, readBuf: new(bytes.Buffer), @@ -859,7 +859,7 @@ func (d delayingListener) Accept() (net.Conn, error) { if err != nil { return nil, err } - return delayingConn{ + return &delayingConn{ Conn: c, // Put a default latency as the server's conn. This value will get populated // as packets are exchanged across the delayingConnections. @@ -868,6 +868,16 @@ func (d delayingListener) Accept() (net.Conn, error) { }, nil } +// delayingConn is a wrapped net.Conn that introduces a fixed delay into all +// writes to the connection. The implementation works by specifying a timestamp +// at which the other end of the connection is allowed to read the data, and +// sending that timestamp across the network in a header packet. On the read +// side, a sleep until the timestamp is introduced after the data is read before +// the data is returned to the consumer. +// +// Note that the fixed latency here is a one-way latency, so if you want to +// simulate a round-trip latency of x milliseconds, you should use a delayingConn +// on both ends with x/2 milliseconds of latency. type delayingConn struct { net.Conn latency time.Duration @@ -894,7 +904,7 @@ func (d delayingConn) Write(b []byte) (n int, err error) { return n, err } -func (d delayingConn) Read(b []byte) (n int, err error) { +func (d *delayingConn) Read(b []byte) (n int, err error) { if d.readBuf.Len() == 0 { var hdr delayingHeader if err := binary.Read(d.Conn, binary.BigEndian, &hdr); err != nil { @@ -903,11 +913,9 @@ func (d delayingConn) Read(b []byte) (n int, err error) { // If we somehow don't get our expected magic, throw an error. if hdr.Magic != magic { panic(errors.New("didn't get expected magic bytes header")) - // TODO (rohany): I can't get this to work. I suspect that the problem - // is with that maybe the improperly parsed struct is not written back - // into the same binary format that it was read as. I tried this with sending - // the magic integer over first and saw the same thing. } else { + // Once we receive our first packet, we set our delay to the expected + // delay that was sent on the write side. d.latency = time.Duration(hdr.DelayMS) * time.Millisecond defer func() { time.Sleep(timeutil.Until(timeutil.Unix(0, hdr.ReadTime))) From 98e42c34c3e7faf49af3abcd6c7a7784799f73f2 Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Sun, 28 Mar 2021 18:59:54 -0300 Subject: [PATCH 05/11] cli: fix \demo shutdown with --global Previously, it crashed due to not setting up the demo's rpc context to include the artificial latency map. Release note: None --- pkg/cli/start.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/cli/start.go b/pkg/cli/start.go index 641931485a6b..80718edb50c8 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -1075,6 +1075,9 @@ func getClientGRPCConn( Stopper: stopper, Settings: cfg.Settings, }) + if cfg.TestingKnobs.Server != nil { + rpcContext.Knobs = cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs + } addr, err := addrWithDefaultHost(cfg.AdvertiseAddr) if err != nil { stopper.Stop(ctx) From bcc9ce9b6d407573c70bba3d916b2fc7807853ef Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 2 Apr 2021 17:45:10 +0200 Subject: [PATCH 06/11] server,testutils: ensure server start is friendly to async cancellation This commit ensures that the server Start() method is properly sensitive to its context cancellation and possible async interruption by its stopper. Release note: None --- pkg/cli/demo_cluster.go | 6 ++--- pkg/server/connectivity_test.go | 3 ++- pkg/server/server.go | 15 ++++++++++- pkg/server/testing_knobs.go | 9 ++++--- pkg/server/testserver.go | 3 +-- .../reduce/reducesql/reducesql_test.go | 2 +- pkg/testutils/serverutils/test_server_shim.go | 6 ++--- pkg/testutils/testcluster/testcluster.go | 27 +++++++++++-------- 8 files changed, 45 insertions(+), 26 deletions(-) diff --git a/pkg/cli/demo_cluster.go b/pkg/cli/demo_cluster.go index 970d1d97d50a..93f6f67a038f 100644 --- a/pkg/cli/demo_cluster.go +++ b/pkg/cli/demo_cluster.go @@ -193,7 +193,7 @@ func (c *transientCluster) start( // the start routine needs to wait for the latency map construction after their RPC address has been computed. if demoCtx.simulateLatency { go func(i int) { - if err := serv.Start(); err != nil { + if err := serv.Start(ctx); err != nil { errCh <- err } else { // Block until the ReadyFn has been called before continuing. @@ -203,7 +203,7 @@ func (c *transientCluster) start( }(i) <-servRPCReadyCh } else { - if err := serv.Start(); err != nil { + if err := serv.Start(ctx); err != nil { return err } // Block until the ReadyFn has been called before continuing. @@ -516,7 +516,7 @@ func (c *transientCluster) RestartNode(nodeID roachpb.NodeID) error { close(readyCh) } - if err := serv.Start(); err != nil { + if err := serv.Start(context.Background()); err != nil { return err } diff --git a/pkg/server/connectivity_test.go b/pkg/server/connectivity_test.go index f474365d8097..869d5d4ca5ea 100644 --- a/pkg/server/connectivity_test.go +++ b/pkg/server/connectivity_test.go @@ -333,7 +333,8 @@ func TestJoinVersionGate(t *testing.T) { } defer serv.Stop() - if err := serv.Start(); !errors.Is(errors.Cause(err), server.ErrIncompatibleBinaryVersion) { + ctx := context.Background() + if err := serv.Start(ctx); !errors.Is(errors.Cause(err), server.ErrIncompatibleBinaryVersion) { t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error()) } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 2a7f65b65de4..20a0bf6e75dc 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1284,7 +1284,20 @@ func (s *Server) PreStart(ctx context.Context) error { close(knobs.SignalAfterGettingRPCAddress) } if knobs.PauseAfterGettingRPCAddress != nil { - <-knobs.PauseAfterGettingRPCAddress + select { + case <-knobs.PauseAfterGettingRPCAddress: + // Normal case. Just continue below. + + case <-ctx.Done(): + // Test timeout or some other condition in the caller, by which + // we are instructed to stop. + return ctx.Err() + + case <-s.stopper.ShouldQuiesce(): + // The server is instructed to stop before it even finished + // starting up. + return nil + } } } diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index e7cc9dac6f9d..b22c0886be22 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -29,12 +29,13 @@ type TestingKnobs struct { DefaultZoneConfigOverride *zonepb.ZoneConfig // DefaultSystemZoneConfigOverride, if set, overrides the default system zone config defined in `pkg/config/zone.go` DefaultSystemZoneConfigOverride *zonepb.ZoneConfig - // PauseAfterGettingRPCAddress, if non-nil, instructs the server to wait until - // the channel is closed after getting an RPC serving address. - PauseAfterGettingRPCAddress chan struct{} // SignalAfterGettingRPCAddress, if non-nil, is closed after the server gets - // an RPC server address. + // an RPC server address, and prior to waiting on PauseAfterGettingRPCAddress below. SignalAfterGettingRPCAddress chan struct{} + // PauseAfterGettingRPCAddress, if non-nil, instructs the server to wait until + // the channel is closed after determining its RPC serving address, and after + // closing SignalAfterGettingRPCAddress. + PauseAfterGettingRPCAddress chan struct{} // ContextTestingKnobs allows customization of the RPC context testing knobs. ContextTestingKnobs rpc.ContextTestingKnobs // DiagnosticsTestingKnobs allows customization of diagnostics testing knobs. diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 082bf105583b..0add96df15da 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -454,8 +454,7 @@ func (ts *TestServer) NodeDialer() *nodedialer.Dialer { // TestServer.ServingRPCAddr() after Start() for client connections. // Use TestServer.Stopper().Stop() to shutdown the server after the test // completes. -func (ts *TestServer) Start() error { - ctx := context.Background() +func (ts *TestServer) Start(ctx context.Context) error { return ts.Server.Start(ctx) } diff --git a/pkg/testutils/reduce/reducesql/reducesql_test.go b/pkg/testutils/reduce/reducesql/reducesql_test.go index 601aaf6a3f8a..058ac0e2b071 100644 --- a/pkg/testutils/reduce/reducesql/reducesql_test.go +++ b/pkg/testutils/reduce/reducesql/reducesql_test.go @@ -47,7 +47,7 @@ func isInterestingSQL(contains string) reduce.InterestingFn { } serv := ts.(*server.TestServer) defer serv.Stopper().Stop(ctx) - if err := serv.Start(); err != nil { + if err := serv.Start(context.Background()); err != nil { panic(err) } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 911fbe3473c0..d294028467ff 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -47,7 +47,7 @@ import ( type TestServerInterface interface { Stopper() *stop.Stopper - Start() error + Start(context.Context) error // Node returns the server.Node as an interface{}. Node() interface{} @@ -256,7 +256,7 @@ func StartServer( if err != nil { t.Fatalf("%+v", err) } - if err := server.Start(); err != nil { + if err := server.Start(context.Background()); err != nil { t.Fatalf("%+v", err) } goDB := OpenDBConn( @@ -324,7 +324,7 @@ func StartServerRaw(args base.TestServerArgs) (TestServerInterface, error) { if err != nil { return nil, err } - if err := server.Start(); err != nil { + if err := server.Start(context.Background()); err != nil { return nil, err } return server, nil diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 5b5e551b3968..5462567eb424 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -462,7 +462,7 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe // actually starting the server. func (tc *TestCluster) startServer(idx int, serverArgs base.TestServerArgs) error { server := tc.Servers[idx] - if err := server.Start(); err != nil { + if err := server.Start(context.Background()); err != nil { return err } @@ -1306,17 +1306,22 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } s := srv.(*server.TestServer) + ctx := context.Background() if err := func() error { - tc.mu.Lock() - defer tc.mu.Unlock() - tc.Servers[idx] = s - tc.mu.serverStoppers[idx] = s.Stopper() - - if inspect != nil { - inspect(s) - } + func() { + // Only lock the assignment of the server and the stopper and the call to the inspect function. + // This ensures that the stopper's Stop() method can abort an async Start() call. + tc.mu.Lock() + defer tc.mu.Unlock() + tc.Servers[idx] = s + tc.mu.serverStoppers[idx] = s.Stopper() + + if inspect != nil { + inspect(s) + } + }() - if err := srv.Start(); err != nil { + if err := srv.Start(ctx); err != nil { return err } @@ -1336,7 +1341,7 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. // different port, and a cycle of gossip is necessary to make all other nodes // aware. return contextutil.RunWithTimeout( - context.Background(), "check-conn", 15*time.Second, + ctx, "check-conn", 15*time.Second, func(ctx context.Context) error { r := retry.StartWithCtx(ctx, retry.Options{ InitialBackoff: 1 * time.Millisecond, From 36331e641a73115396932ac0d67339366a955e5a Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sat, 3 Apr 2021 13:15:46 +0200 Subject: [PATCH 07/11] cli/demo: refactor the server initialization The latency simulation code needs to be injected as a latency map while the servers are initialized, i.e. concurrently with server startup. The previous initialization code for `cockroach demo` to achieve this was exceedly difficult to understand and was, in fact, incorrect. This commit reworks this code by exposing the overall workings as a comment and then ensuring the structure of the comment follows the explanation. It also add logging. Additionally, this change ensures that the same initialization code is used regardless of whether latency simulation is requested or not. Release note: None Co-authored-by: Raphael 'kena' Poss Co-authored-by: Oliver Tan --- pkg/cli/demo_cluster.go | 573 ++++++++++++++++++++++++++++----------- pkg/cli/sql.go | 2 +- pkg/server/server.go | 7 +- pkg/server/testserver.go | 6 + 4 files changed, 433 insertions(+), 155 deletions(-) diff --git a/pkg/cli/demo_cluster.go b/pkg/cli/demo_cluster.go index 93f6f67a038f..789a35468d84 100644 --- a/pkg/cli/demo_cluster.go +++ b/pkg/cli/demo_cluster.go @@ -37,23 +37,24 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/severity" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/cockroach/pkg/workload/workloadsql" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/spf13/cobra" "golang.org/x/time/rate" ) type transientCluster struct { - connURL string - demoDir string - useSockets bool - stopper *stop.Stopper - s *server.TestServer - servers []*server.TestServer + connURL string + demoDir string + useSockets bool + stopper *stop.Stopper + firstServer *server.TestServer + servers []*server.TestServer httpFirstPort int sqlFirstPort int @@ -119,184 +120,427 @@ func (c *transientCluster) checkConfigAndSetupLogging( func (c *transientCluster) start( ctx context.Context, cmd *cobra.Command, gen workload.Generator, ) (err error) { - serverFactory := server.TestServerFactory - var servers []*server.TestServer + ctx = logtags.AddTag(ctx, "start-demo-cluster", nil) + // We now proceed to start all the nodes concurrently. This is + // somewhat a complicated dance. + // + // On the one hand, we use a concurrent start, because the latency + // map needs to be initialized after all the nodes have started + // listening on the network, but before they proceed to initialize + // their RPC context. + // + // On the other hand, we cannot use full concurrency either, because + // we need to wait until the *first* node has started + // listening on the network before we can start the next nodes. + // + // So we proceed in phases, as follows: + // + // 1. create and start the first node asynchronously. + // 2. wait for the first node to listen on RPC and determine its + // listen addr; OR wait for an error in the first node initialization. + // 3. create and start all the other nodes asynchronously. + // 4. wait for all the nodes to listen on RPC, OR wait for an error + // from any node. + // 5. if no error, proceed to initialize the latency map. + // 6. in sequence, let each node initialize then wait for RPC readiness OR error from them. + // This ensures the node IDs are assigned sequentially. + // 7. wait for the SQL readiness from all nodes. + // 8. after all nodes are initialized, initialize SQL and telemetry. + // - // latencyMapWaitCh is used to block test servers after RPC address computation until the artificial - // latency map has been constructed. - latencyMapWaitCh := make(chan struct{}) + timeoutCh := time.After(maxNodeInitTime) - // errCh is used to catch all errors when initializing servers. - // Sending a nil on this channel indicates success. + // errCh is used to catch errors when initializing servers. errCh := make(chan error, demoCtx.nodes) - for i := 0; i < demoCtx.nodes; i++ { - // All the nodes connect to the address of the first server created. - var joinAddr string - if i != 0 { - joinAddr = c.s.ServingRPCAddr() + // rpcAddrReadyChs will be used in steps 2 and 4 below + // to wait until the nodes know their RPC address. + rpcAddrReadyChs := make([]chan struct{}, demoCtx.nodes) + + // latencyMapWaitChs is used to block test servers after RPC address + // computation until the artificial latency map has been constructed. + latencyMapWaitChs := make([]chan struct{}, demoCtx.nodes) + + // Step 1: create the first node. + { + phaseCtx := logtags.AddTag(ctx, "phase", 1) + log.Infof(phaseCtx, "creating the first node") + + latencyMapWaitChs[0] = make(chan struct{}) + firstRPCAddrReadyCh, err := c.createAndAddNode(phaseCtx, 0, latencyMapWaitChs[0], timeoutCh) + if err != nil { + return err + } + rpcAddrReadyChs[0] = firstRPCAddrReadyCh + } + + // Step 2: start the first node asynchronously, then wait for RPC + // listen readiness or error. + { + phaseCtx := logtags.AddTag(ctx, "phase", 2) + + log.Infof(phaseCtx, "starting first node") + if err := c.startNodeAsync(phaseCtx, 0, errCh, timeoutCh); err != nil { + return err } - nodeID := roachpb.NodeID(i + 1) - args := testServerArgsForTransientCluster( - c.sockForServer(nodeID), nodeID, joinAddr, c.demoDir, - c.sqlFirstPort, - c.httpFirstPort, - c.stickyEngineRegistry, - ) - if i == 0 { - // The first node also auto-inits the cluster. - args.NoAutoInitializeCluster = false + log.Infof(phaseCtx, "waiting for first node RPC address") + if err := c.waitForRPCAddrReadinessOrError(phaseCtx, 0, errCh, rpcAddrReadyChs, timeoutCh); err != nil { + return err } + } - // servRPCReadyCh is used if latency simulation is requested to notify that a test server has - // successfully computed its RPC address. - servRPCReadyCh := make(chan struct{}) + // Step 3: create the other nodes and start them asynchronously. + { + phaseCtx := logtags.AddTag(ctx, "phase", 3) + log.Infof(phaseCtx, "starting other nodes") - if demoCtx.simulateLatency { - serverKnobs := args.Knobs.Server.(*server.TestingKnobs) - serverKnobs.PauseAfterGettingRPCAddress = latencyMapWaitCh - serverKnobs.SignalAfterGettingRPCAddress = servRPCReadyCh - serverKnobs.ContextTestingKnobs = rpc.ContextTestingKnobs{ - ArtificialLatencyMap: make(map[string]int), + for i := 1; i < demoCtx.nodes; i++ { + latencyMapWaitChs[i] = make(chan struct{}) + rpcAddrReady, err := c.createAndAddNode(phaseCtx, i, latencyMapWaitChs[i], timeoutCh) + if err != nil { + return err } + rpcAddrReadyChs[i] = rpcAddrReady } - s, err := serverFactory.New(args) - if err != nil { - return err - } - serv := s.(*server.TestServer) - c.stopper.AddCloser(stop.CloserFn(serv.Stop)) - if i == 0 { - c.s = serv - // The first node connects its Settings instance to the `log` - // package for crash reporting. - // - // There's a known shortcoming with this approach: restarting - // node 1 using the \demo commands will break this connection: - // if the user changes the cluster setting after restarting node - // 1, the `log` package will not see this change. - // - // TODO(knz): re-connect the `log` package every time the first - // node is restarted and gets a new `Settings` instance. - settings.SetCanonicalValuesContainer(&serv.ClusterSettings().SV) - } - servers = append(servers, serv) - - // We force a wait for all servers until they are ready. - servReadyFnCh := make(chan struct{}) - serv.Cfg.ReadyFn = func(bool) { - close(servReadyFnCh) - } - - // If latency simulation is requested, start the servers in a background thread. We do this because - // the start routine needs to wait for the latency map construction after their RPC address has been computed. - if demoCtx.simulateLatency { - go func(i int) { - if err := serv.Start(ctx); err != nil { - errCh <- err - } else { - // Block until the ReadyFn has been called before continuing. - <-servReadyFnCh - errCh <- nil - } - }(i) - <-servRPCReadyCh - } else { - if err := serv.Start(ctx); err != nil { + // Ensure we close all sticky stores we've created when the stopper + // instructs the entire cluster to stop. We do this only here + // because we want this closer to be registered after all the + // individual servers' Stop() methods have been registered + // via createAndAddNode() above. + c.stopper.AddCloser(stop.CloserFn(func() { + c.stickyEngineRegistry.CloseAllStickyInMemEngines() + })) + + // Start the remaining nodes asynchronously. + for i := 1; i < demoCtx.nodes; i++ { + if err := c.startNodeAsync(phaseCtx, i, errCh, timeoutCh); err != nil { return err } - // Block until the ReadyFn has been called before continuing. - <-servReadyFnCh - errCh <- nil } } - // Ensure we close all sticky stores we've created. - c.stopper.AddCloser(stop.CloserFn(func() { - c.stickyEngineRegistry.CloseAllStickyInMemEngines() - })) - c.servers = servers + // Step 4: wait for all the nodes to know their RPC address, + // or for an error or premature shutdown. + { + phaseCtx := logtags.AddTag(ctx, "phase", 4) + log.Infof(phaseCtx, "waiting for remaining nodes to get their RPC address") - if demoCtx.simulateLatency { - // Now, all servers have been started enough to know their own RPC serving - // addresses, but nothing else. Assemble the artificial latency map. - for i, src := range servers { - latencyMap := src.Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap - srcLocality, ok := src.Cfg.Locality.Find("region") - if !ok { - continue - } - srcLocalityMap, ok := regionToRegionToLatency[srcLocality] - if !ok { - continue + for i := 0; i < demoCtx.nodes; i++ { + if err := c.waitForRPCAddrReadinessOrError(phaseCtx, i, errCh, rpcAddrReadyChs, timeoutCh); err != nil { + return err } - for j, dst := range servers { - if i == j { + } + } + + // Step 5: optionally initialize the latency map, then let the servers + // proceed with their initialization. + + { + phaseCtx := logtags.AddTag(ctx, "phase", 5) + + // If latency simulation is requested, initialize the latency map. + if demoCtx.simulateLatency { + // Now, all servers have been started enough to know their own RPC serving + // addresses, but nothing else. Assemble the artificial latency map. + log.Infof(phaseCtx, "initializing latency map") + for i, serv := range c.servers { + latencyMap := serv.Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap + srcLocality, ok := serv.Cfg.Locality.Find("region") + if !ok { continue } - dstLocality, ok := dst.Cfg.Locality.Find("region") + srcLocalityMap, ok := regionToRegionToLatency[srcLocality] if !ok { continue } - latency := srcLocalityMap[dstLocality] - latencyMap[dst.ServingRPCAddr()] = latency + for j, dst := range c.servers { + if i == j { + continue + } + dstLocality, ok := dst.Cfg.Locality.Find("region") + if !ok { + continue + } + latency := srcLocalityMap[dstLocality] + latencyMap[dst.ServingRPCAddr()] = latency + } } } } - // We've assembled our latency maps and are ready for all servers to proceed - // through bootstrapping. - close(latencyMapWaitCh) + { + phaseCtx := logtags.AddTag(ctx, "phase", 6) + + for i := 0; i < demoCtx.nodes; i++ { + log.Infof(phaseCtx, "letting server %d initialize", i) + close(latencyMapWaitChs[i]) + if err := c.waitForNodeIDReadiness(phaseCtx, i, errCh, timeoutCh); err != nil { + return err + } + log.Infof(phaseCtx, "node n%d initialized", c.servers[i].NodeID()) + } + } - // Wait for all servers to respond. { - timeRemaining := maxNodeInitTime - lastUpdateTime := timeutil.Now() - var err error + phaseCtx := logtags.AddTag(ctx, "phase", 7) + for i := 0; i < demoCtx.nodes; i++ { - select { - case e := <-errCh: - err = errors.CombineErrors(err, e) - case <-time.After(timeRemaining): - return errors.New("failed to setup transientCluster in time") + log.Infof(phaseCtx, "waiting for server %d SQL readiness", i) + if err := c.waitForSQLReadiness(phaseCtx, i, errCh, timeoutCh); err != nil { + return err } - updateTime := timeutil.Now() - timeRemaining -= updateTime.Sub(lastUpdateTime) - lastUpdateTime = updateTime + log.Infof(phaseCtx, "node n%d ready", c.servers[i].NodeID()) + } + } + + { + phaseCtx := logtags.AddTag(ctx, "phase", 8) + + // Run the SQL initialization. This takes care of setting up the + // initial replication factor for small clusters and creating the + // admin user. + log.Infof(phaseCtx, "running initial SQL for demo cluster") + + const demoUsername = "demo" + demoPassword := genDemoPassword(demoUsername) + if err := runInitialSQL(phaseCtx, c.firstServer.Server, demoCtx.nodes < 3, demoUsername, demoPassword); err != nil { + return err } + if demoCtx.insecure { + c.adminUser = security.RootUserName() + c.adminPassword = "unused" + } else { + c.adminUser = security.MakeSQLUsernameFromPreNormalizedString(demoUsername) + c.adminPassword = demoPassword + } + + // Prepare the URL for use by the SQL shell. + c.connURL, err = c.getNetworkURLForServer(0, gen, true /* includeAppName */) if err != nil { return err } + + // Start up the update check loop. + // We don't do this in (*server.Server).Start() because we don't want this + // overhead and possible interference in tests. + if !demoCtx.disableTelemetry { + log.Infof(phaseCtx, "starting telemetry") + c.firstServer.StartDiagnostics(phaseCtx) + } } + return nil +} - // Run the SQL initialization. This takes care of setting up the - // initial replication factor for small clusters and creating the - // admin user. - const demoUsername = "demo" - demoPassword := genDemoPassword(demoUsername) - if err := runInitialSQL(ctx, c.s.Server, demoCtx.nodes < 3, demoUsername, demoPassword); err != nil { - return err +// createAndAddNode is responsible for determining node parameters, +// instantiating the server component and connecting it to the +// cluster's stopper. +// +// The caller is responsible for calling createAndAddNode() with idx 0 +// synchronously, then startNodeAsync(), then +// waitForNodeRPCListener(), before using createAndAddNode() with +// other indexes. +func (c *transientCluster) createAndAddNode( + ctx context.Context, idx int, latencyMapWaitCh chan struct{}, timeoutCh <-chan time.Time, +) (rpcAddrReadyCh chan struct{}, err error) { + var joinAddr string + if idx > 0 { + // The caller is responsible for ensuring that the method + // is not called before the first server has finished + // computing its RPC listen address. + joinAddr = c.firstServer.ServingRPCAddr() } - if demoCtx.insecure { - c.adminUser = security.RootUserName() - c.adminPassword = "unused" - } else { - c.adminUser = security.MakeSQLUsernameFromPreNormalizedString(demoUsername) - c.adminPassword = demoPassword + nodeID := roachpb.NodeID(idx + 1) + args := testServerArgsForTransientCluster( + c.sockForServer(nodeID), nodeID, joinAddr, c.demoDir, + c.sqlFirstPort, + c.httpFirstPort, + c.stickyEngineRegistry, + ) + if idx == 0 { + // The first node also auto-inits the cluster. + args.NoAutoInitializeCluster = false + } + + serverKnobs := args.Knobs.Server.(*server.TestingKnobs) + + // SignalAfterGettingRPCAddress will be closed by the server startup routine + // once it has determined its RPC address. + rpcAddrReadyCh = make(chan struct{}) + serverKnobs.SignalAfterGettingRPCAddress = rpcAddrReadyCh + + // The server will wait until PauseAfterGettingRPCAddress is closed + // after it has signaled SignalAfterGettingRPCAddress, and before + // it continues the startup routine. + serverKnobs.PauseAfterGettingRPCAddress = latencyMapWaitCh + + if demoCtx.simulateLatency { + // The latency map will be populated after all servers have + // started listening on RPC, and before they proceed with their + // startup routine. + serverKnobs.ContextTestingKnobs = rpc.ContextTestingKnobs{ + ArtificialLatencyMap: make(map[string]int), + } } - // Prepare the URL for use by the SQL shell. - c.connURL, err = c.getNetworkURLForServer(0, gen, true /* includeAppName */) + // Create the server instance. This also registers the in-memory store + // into the sticky engine registry. + s, err := server.TestServerFactory.New(args) if err != nil { + return nil, err + } + serv := s.(*server.TestServer) + + // Ensure that this server gets stopped when the top level demo + // stopper instructs the cluster to stop. + c.stopper.AddCloser(stop.CloserFn(serv.Stop)) + + if idx == 0 { + // Remember the first server for later use by other APIs on + // transientCluster. + c.firstServer = serv + // The first node connects its Settings instance to the `log` + // package for crash reporting. + // + // There's a known shortcoming with this approach: restarting + // node 1 using the \demo commands will break this connection: + // if the user changes the cluster setting after restarting node + // 1, the `log` package will not see this change. + // + // TODO(knz): re-connect the `log` package every time the first + // node is restarted and gets a new `Settings` instance. + settings.SetCanonicalValuesContainer(&serv.ClusterSettings().SV) + } + + // Remember this server for the stop/restart primitives in the SQL + // shell. + c.servers = append(c.servers, serv) + + return rpcAddrReadyCh, nil +} + +// startNodeAsync starts the node initialization asynchronously. +func (c *transientCluster) startNodeAsync( + ctx context.Context, idx int, errCh chan error, timeoutCh <-chan time.Time, +) error { + if idx > len(c.servers) { + return errors.AssertionFailedf("programming error: server %d not created yet", idx) + } + + serv := c.servers[idx] + tag := fmt.Sprintf("start-n%d", idx+1) + return c.stopper.RunAsyncTask(ctx, tag, func(ctx context.Context) { + ctx = logtags.AddTag(ctx, tag, nil) + err := serv.Start(ctx) + if err != nil { + log.Warningf(ctx, "server %d failed to start: %v", idx, err) + select { + case errCh <- err: + + // Don't block if we are shutting down. + case <-ctx.Done(): + case <-serv.Stopper().ShouldQuiesce(): + case <-c.stopper.ShouldQuiesce(): + case <-timeoutCh: + } + } + }) +} + +// waitForRPCAddrReadinessOrError waits until the given server knows its +// RPC address or fails to initialize. +func (c *transientCluster) waitForRPCAddrReadinessOrError( + ctx context.Context, + idx int, + errCh chan error, + rpcAddrReadyChs []chan struct{}, + timeoutCh <-chan time.Time, +) error { + if idx > len(rpcAddrReadyChs) || idx > len(c.servers) { + return errors.AssertionFailedf("programming error: server %d not created yet", idx) + } + + select { + case <-rpcAddrReadyChs[idx]: + // This server knows its RPC address. Proceed with the next phase. + return nil + + // If we are asked for an early shutdown by the cases below or a + // server startup failure, detect it here. + case err := <-errCh: return err + case <-timeoutCh: + return errors.Newf("demo startup timeout while waiting for server %d", idx) + case <-ctx.Done(): + return errors.CombineErrors(ctx.Err(), errors.Newf("server %d startup aborted due to context cancellation", idx)) + case <-c.servers[idx].Stopper().ShouldQuiesce(): + return errors.Newf("server %d stopped prematurely", idx) + case <-c.stopper.ShouldQuiesce(): + return errors.Newf("demo cluster stopped prematurely while starting server %d", idx) + } +} + +// waitForNodeIDReadiness waits until the given server reports it knows its node ID. +func (c *transientCluster) waitForNodeIDReadiness( + ctx context.Context, idx int, errCh chan error, timeoutCh <-chan time.Time, +) error { + retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond, MaxBackoff: time.Second} + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + log.Infof(ctx, "waiting for server %d to know its node ID", idx) + select { + // Errors or premature shutdown. + case <-timeoutCh: + return errors.Newf("initialization timeout while waiting for server %d node ID", idx) + case err := <-errCh: + return errors.Wrapf(err, "server %d failed to start", idx) + case <-ctx.Done(): + return errors.CombineErrors(errors.Newf("context cancellation while waiting for server %d to have a node ID", idx), ctx.Err()) + case <-c.servers[idx].Stopper().ShouldQuiesce(): + return errors.Newf("server %s shut down prematurely", idx) + case <-c.stopper.ShouldQuiesce(): + return errors.Newf("demo cluster shut down prematurely while waiting for server %d to have a node ID", idx) + + default: + if c.servers[idx].NodeID() == 0 { + log.Infof(ctx, "server %d does not know its node ID yet", idx) + continue + } else { + log.Infof(ctx, "server %d: n%d", idx, c.servers[idx].NodeID()) + } + } + break } + return nil +} - // Start up the update check loop. - // We don't do this in (*server.Server).Start() because we don't want this - // overhead and possible interference in tests. - if !demoCtx.disableTelemetry { - c.s.StartDiagnostics(ctx) +// waitForSQLReadiness waits until the given server reports it is +// healthy and ready to accept SQL clients. +func (c *transientCluster) waitForSQLReadiness( + baseCtx context.Context, idx int, errCh chan error, timeoutCh <-chan time.Time, +) error { + retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond, MaxBackoff: time.Second} + for r := retry.StartWithCtx(baseCtx, retryOpts); r.Next(); { + ctx := logtags.AddTag(baseCtx, "n", c.servers[idx].NodeID()) + log.Infof(ctx, "waiting for server %d to become ready", idx) + select { + // Errors or premature shutdown. + case <-timeoutCh: + return errors.Newf("initialization timeout while waiting for server %d readiness", idx) + case err := <-errCh: + return errors.Wrapf(err, "server %d failed to start", idx) + case <-ctx.Done(): + return errors.CombineErrors(errors.Newf("context cancellation while waiting for server %d to become ready", idx), ctx.Err()) + case <-c.servers[idx].Stopper().ShouldQuiesce(): + return errors.Newf("server %s shut down prematurely", idx) + case <-c.stopper.ShouldQuiesce(): + return errors.Newf("demo cluster shut down prematurely while waiting for server %d to become ready", idx) + default: + if err := c.servers[idx].Readiness(ctx); err != nil { + log.Infof(ctx, "server %d not yet ready: %v", idx, err) + continue + } + } + break } return nil } @@ -429,7 +673,7 @@ func (c *transientCluster) Recommission(nodeID roachpb.NodeID) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - adminClient, finish, err := getAdminClient(ctx, *(c.s.Cfg)) + adminClient, finish, err := getAdminClient(ctx, *(c.firstServer.Cfg)) if err != nil { return err } @@ -454,7 +698,7 @@ func (c *transientCluster) Decommission(nodeID roachpb.NodeID) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - adminClient, finish, err := getAdminClient(ctx, *(c.s.Cfg)) + adminClient, finish, err := getAdminClient(ctx, *(c.firstServer.Cfg)) if err != nil { return err } @@ -502,7 +746,10 @@ func (c *transientCluster) RestartNode(nodeID roachpb.NodeID) error { } // TODO(#42243): re-compute the latency mapping. - args := testServerArgsForTransientCluster(c.sockForServer(nodeID), nodeID, c.s.ServingRPCAddr(), c.demoDir, + // TODO(...): the RPC address of the first server may not be available + // if the first server was shut down. + args := testServerArgsForTransientCluster(c.sockForServer(nodeID), nodeID, + c.firstServer.ServingRPCAddr(), c.demoDir, c.sqlFirstPort, c.httpFirstPort, c.stickyEngineRegistry) s, err := server.TestServerFactory.New(args) if err != nil { @@ -535,7 +782,7 @@ func (c *transientCluster) RestartNode(nodeID roachpb.NodeID) error { // AddNode create a new node in the cluster and start it. // This function uses RestartNode to perform the actual node // starting. -func (c *transientCluster) AddNode(localityString string) error { +func (c *transientCluster) AddNode(ctx context.Context, localityString string) error { // '\demo add' accepts both strings that are quoted and not quoted. To properly make use of // quoted strings, strip off the quotes. Before we do that though, make sure that the quotes match, // or that there aren't any quotes in the string. @@ -757,17 +1004,23 @@ func (c *transientCluster) runWorkload( log.Warningf(ctx, "error running workload query: %+v", err) } select { - case <-c.s.Stopper().ShouldQuiesce(): + case <-c.firstServer.Stopper().ShouldQuiesce(): + return + case <-ctx.Done(): + log.Warningf(ctx, "workload terminating from context cancellation: %v", ctx.Err()) + return + case <-c.stopper.ShouldQuiesce(): + log.Warningf(ctx, "demo cluster shutting down") return default: } } } } - // As the SQL shell is tied to `c.s`, this means we want to tie the workload + // As the SQL shell is tied to `c.firstServer`, this means we want to tie the workload // onto this as we want the workload to stop when the server dies, // rather than the cluster. Otherwise, interrupts on cockroach demo hangs. - if err := c.s.Stopper().RunAsyncTask(ctx, "workload", workloadFun(workerFn)); err != nil { + if err := c.firstServer.Stopper().RunAsyncTask(ctx, "workload", workloadFun(workerFn)); err != nil { return err } } @@ -797,17 +1050,33 @@ func (c *transientCluster) acquireDemoLicense(ctx context.Context) (chan error, go func() { defer db.Close() - success, err := GetAndApplyLicense(db, c.s.ClusterID(), demoOrg) + success, err := GetAndApplyLicense(db, c.firstServer.ClusterID(), demoOrg) if err != nil { - licenseDone <- err + select { + case licenseDone <- err: + + // Avoid waiting on the license channel write if the + // server or cluster is shutting down. + case <-ctx.Done(): + case <-c.firstServer.Stopper().ShouldQuiesce(): + case <-c.stopper.ShouldQuiesce(): + } return } if !success { if demoCtx.geoPartitionedReplicas { - licenseDone <- errors.WithDetailf( + select { + case licenseDone <- errors.WithDetailf( errors.New("unable to acquire a license for this demo"), "Enterprise features are needed for this demo (--%s).", - cliflags.DemoGeoPartitionedReplicas.Name) + cliflags.DemoGeoPartitionedReplicas.Name): + + // Avoid waiting on the license channel write if the + // server or cluster is shutting down. + case <-ctx.Done(): + case <-c.firstServer.Stopper().ShouldQuiesce(): + case <-c.stopper.ShouldQuiesce(): + } return } } diff --git a/pkg/cli/sql.go b/pkg/cli/sql.go index a63f38508aaa..494cf45906bf 100644 --- a/pkg/cli/sql.go +++ b/pkg/cli/sql.go @@ -569,7 +569,7 @@ func (c *cliState) handleDemoAddNode(cmd []string, nextState, errState cliStateE return nextState } - if err := demoCtx.transientCluster.AddNode(cmd[1]); err != nil { + if err := demoCtx.transientCluster.AddNode(context.Background(), cmd[1]); err != nil { return c.internalServerError(errState, err) } addedNodeID := len(demoCtx.transientCluster.servers) diff --git a/pkg/server/server.go b/pkg/server/server.go index 20a0bf6e75dc..4985b4d1eacc 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1281,9 +1281,11 @@ func (s *Server) PreStart(ctx context.Context) error { if s.cfg.TestingKnobs.Server != nil { knobs := s.cfg.TestingKnobs.Server.(*TestingKnobs) if knobs.SignalAfterGettingRPCAddress != nil { + log.Infof(ctx, "signaling caller that RPC address is ready") close(knobs.SignalAfterGettingRPCAddress) } if knobs.PauseAfterGettingRPCAddress != nil { + log.Infof(ctx, "waiting for signal from caller to proceed with initialization") select { case <-knobs.PauseAfterGettingRPCAddress: // Normal case. Just continue below. @@ -1291,13 +1293,14 @@ func (s *Server) PreStart(ctx context.Context) error { case <-ctx.Done(): // Test timeout or some other condition in the caller, by which // we are instructed to stop. - return ctx.Err() + return errors.CombineErrors(errors.New("server stopping prematurely from context shutdown"), ctx.Err()) case <-s.stopper.ShouldQuiesce(): // The server is instructed to stop before it even finished // starting up. - return nil + return errors.New("server stopping prematurely") } + log.Infof(ctx, "caller is letting us proceed with initialization") } } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 0add96df15da..a32faa31b57a 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -987,6 +987,12 @@ func (ts *TestServer) DrainClients(ctx context.Context) error { return ts.drainClients(ctx, nil /* reporter */) } +// Readiness returns nil when the server's health probe reports +// readiness, a readiness error otherwise. +func (ts *TestServer) Readiness(ctx context.Context) error { + return ts.admin.checkReadinessForHealthCheck(ctx) +} + // WriteSummaries implements TestServerInterface. func (ts *TestServer) WriteSummaries() error { return ts.node.writeNodeStatus(context.TODO(), time.Hour, false) From 885c5f379b6a53f511f45d9ee9aee199593db5fb Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Tue, 23 Mar 2021 19:34:38 +1100 Subject: [PATCH 08/11] cli: disallow adding or shutting down nodes in demo --global In addition to the release note, reworded the add node (which already errored beforehand), and adding a clause in RestartNode. Note there is no code path that allows a restart, since shutdown is required before a node restart. Release note (cli change): No longer support `\demo add/shutdown` on cockroach demo. --- pkg/cli/demo_cluster.go | 11 +++++++++++ pkg/cli/interactive_tests/test_demo_global.tcl | 8 ++++++++ pkg/cli/sql.go | 5 ----- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/cli/demo_cluster.go b/pkg/cli/demo_cluster.go index 789a35468d84..80e3149e0ae4 100644 --- a/pkg/cli/demo_cluster.go +++ b/pkg/cli/demo_cluster.go @@ -627,6 +627,9 @@ func (c *transientCluster) cleanup(ctx context.Context) { // DrainAndShutdown will gracefully attempt to drain a node in the cluster, and // then shut it down. func (c *transientCluster) DrainAndShutdown(nodeID roachpb.NodeID) error { + if demoCtx.simulateLatency { + return errors.Errorf("shutting down nodes is not supported in --%s configurations", cliflags.Global.Name) + } nodeIndex := int(nodeID - 1) if nodeIndex < 0 || nodeIndex >= len(c.servers) { @@ -748,6 +751,9 @@ func (c *transientCluster) RestartNode(nodeID roachpb.NodeID) error { // TODO(#42243): re-compute the latency mapping. // TODO(...): the RPC address of the first server may not be available // if the first server was shut down. + if demoCtx.simulateLatency { + return errors.Errorf("restarting nodes is not supported in --%s configurations", cliflags.Global.Name) + } args := testServerArgsForTransientCluster(c.sockForServer(nodeID), nodeID, c.firstServer.ServingRPCAddr(), c.demoDir, c.sqlFirstPort, c.httpFirstPort, c.stickyEngineRegistry) @@ -783,6 +789,11 @@ func (c *transientCluster) RestartNode(nodeID roachpb.NodeID) error { // This function uses RestartNode to perform the actual node // starting. func (c *transientCluster) AddNode(ctx context.Context, localityString string) error { + // TODO(#42243): re-compute the latency mapping for this to work. + if demoCtx.simulateLatency { + return errors.Errorf("adding nodes is not supported in --%s configurations", cliflags.Global.Name) + } + // '\demo add' accepts both strings that are quoted and not quoted. To properly make use of // quoted strings, strip off the quotes. Before we do that though, make sure that the quotes match, // or that there aren't any quotes in the string. diff --git a/pkg/cli/interactive_tests/test_demo_global.tcl b/pkg/cli/interactive_tests/test_demo_global.tcl index d094a035530e..958431e703d2 100644 --- a/pkg/cli/interactive_tests/test_demo_global.tcl +++ b/pkg/cli/interactive_tests/test_demo_global.tcl @@ -18,6 +18,14 @@ send "SELECT region, zones FROM \[SHOW REGIONS FROM CLUSTER\] ORDER BY region;\r eexpect " europe-west1 | {b,c,d}" eexpect " us-east1 | {b,c,d}" eexpect " us-west1 | {a,b,c}" + +# Test we cannot add or restart nodes. +send "\\demo add region=europe-west1\r" +eexpect "adding nodes is not supported in --global configurations" +eexpect "defaultdb>" + +send "\\demo shutdown 3\r" +eexpect "shutting down nodes is not supported in --global configurations" eexpect "defaultdb>" interrupt diff --git a/pkg/cli/sql.go b/pkg/cli/sql.go index 494cf45906bf..e05761bcd96c 100644 --- a/pkg/cli/sql.go +++ b/pkg/cli/sql.go @@ -564,11 +564,6 @@ func (c *cliState) handleDemoAddNode(cmd []string, nextState, errState cliStateE return c.internalServerError(errState, fmt.Errorf("bad call to handleDemoAddNode")) } - if demoCtx.simulateLatency { - fmt.Printf("add command is not supported in --global configurations\n") - return nextState - } - if err := demoCtx.transientCluster.AddNode(context.Background(), cmd[1]); err != nil { return c.internalServerError(errState, err) } From 2ce000cb49fd2ead3b22c12031722033dc0baaa0 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Tue, 13 Apr 2021 10:49:07 +1000 Subject: [PATCH 09/11] cli: wrap intro message around printfUnlessEmbedded Release note: None --- pkg/cli/demo.go | 2 +- pkg/cli/sql.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index db6dee152a4a..a8d7303abfda 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -293,7 +293,7 @@ func runDemo(cmd *cobra.Command, gen workload.Generator) (err error) { checkInteractive(cmdIn) if cliCtx.isInteractive { - fmt.Printf(`# + printfUnlessEmbedded(`# # Welcome to the CockroachDB demo database! # # You are connected to a temporary, in-memory CockroachDB cluster of %d node%s. diff --git a/pkg/cli/sql.go b/pkg/cli/sql.go index e05761bcd96c..e35d034a328c 100644 --- a/pkg/cli/sql.go +++ b/pkg/cli/sql.go @@ -1796,3 +1796,9 @@ func printlnUnlessEmbedded(args ...interface{}) { fmt.Println(args...) } } + +func printfUnlessEmbedded(f string, args ...interface{}) { + if !sqlCtx.embeddedMode { + fmt.Printf(f, args...) + } +} From 0101785c2c126a226190f9258de34c3e67fc0163 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Tue, 13 Apr 2021 10:51:56 +1000 Subject: [PATCH 10/11] cli: add notice about --global being experimental on demo startup Release note (cli change): Added a note when starting up a --global demo cluster that it is experimental. --- pkg/cli/demo.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index a8d7303abfda..b63c6fc95941 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -299,6 +299,13 @@ func runDemo(cmd *cobra.Command, gen workload.Generator) (err error) { # You are connected to a temporary, in-memory CockroachDB cluster of %d node%s. `, demoCtx.nodes, util.Pluralize(int64(demoCtx.nodes))) + if demoCtx.simulateLatency { + printfUnlessEmbedded( + "#\n# WARNING: the use of --%s is experimental. Some features may not work as expected.\n", + cliflags.Global.Name, + ) + } + // Only print details about the telemetry configuration if the // user has control over it. if demoCtx.disableTelemetry { From c9432ed50ace0b022583cf81f7f6a2ee9e6fa37f Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Mon, 5 Apr 2021 11:44:33 +0200 Subject: [PATCH 11/11] rpc: avoid a panic in delayingConn If the remote server shuts down while the connection is established, the delaying conn may not complete its handshake. Avoid a panic in that case. Release note: None --- pkg/rpc/context.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 80cf8875e751..e8b82db3bea5 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -904,6 +904,8 @@ func (d delayingConn) Write(b []byte) (n int, err error) { return n, err } +var errMagicNotFound = errors.New("didn't get expected magic bytes header") + func (d *delayingConn) Read(b []byte) (n int, err error) { if d.readBuf.Len() == 0 { var hdr delayingHeader @@ -912,17 +914,17 @@ func (d *delayingConn) Read(b []byte) (n int, err error) { } // If we somehow don't get our expected magic, throw an error. if hdr.Magic != magic { - panic(errors.New("didn't get expected magic bytes header")) - } else { - // Once we receive our first packet, we set our delay to the expected - // delay that was sent on the write side. - d.latency = time.Duration(hdr.DelayMS) * time.Millisecond - defer func() { - time.Sleep(timeutil.Until(timeutil.Unix(0, hdr.ReadTime))) - }() - if _, err := io.CopyN(d.readBuf, d.Conn, int64(hdr.Sz)); err != nil { - return 0, err - } + return 0, errors.WithStack(errMagicNotFound) + } + + // Once we receive our first packet, we set our delay to the expected + // delay that was sent on the write side. + d.latency = time.Duration(hdr.DelayMS) * time.Millisecond + defer func() { + time.Sleep(timeutil.Until(timeutil.Unix(0, hdr.ReadTime))) + }() + if _, err := io.CopyN(d.readBuf, d.Conn, int64(hdr.Sz)); err != nil { + return 0, err } } return d.readBuf.Read(b)