From b483a265219e41d362848a687e15a0df138c26d2 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 12 Jan 2021 17:00:39 +0100 Subject: [PATCH 1/3] server,cli: ensure the server identifiers are properly logged in SQL pods This is a forward-port of a change we've discovered was necessary in v20.2: we want the cluster ID and other server identifiers to be reported in log files. This was not possible before. Release note: None --- pkg/ccl/backupccl/backup_test.go | 23 ++++++++++++ .../backupccl/create_scheduled_backup_test.go | 13 +++++++ pkg/ccl/importccl/import_stmt_test.go | 4 +++ .../interactive_tests/test_log_config_msg.tcl | 7 ++-- pkg/cli/mt_start_sql.go | 7 +--- pkg/cmd/roachtest/multitenant.go | 35 +++++++++++++++++-- pkg/server/testserver.go | 30 ++++++++++------ pkg/sql/logictest/logic.go | 11 +++++- pkg/sql/telemetry_test.go | 3 ++ pkg/util/log/clog.go | 4 ++- pkg/util/log/flags.go | 10 ++++++ pkg/util/log/test_log_scope.go | 4 +++ 12 files changed, 129 insertions(+), 22 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 02b2ba72c0da..50fa7909dc71 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6093,6 +6093,8 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) { func TestPaginatedBackupTenant(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)() const numAccounts = 1 @@ -6192,6 +6194,8 @@ func TestPaginatedBackupTenant(t *testing.T) { // Ensure that backing up and restoring tenants succeeds. func TestBackupRestoreTenant(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)() const numAccounts = 1 @@ -6208,11 +6212,16 @@ func TestBackupRestoreTenant(t *testing.T) { tenant10 := sqlutils.MakeSQLRunner(conn10) tenant10.Exec(t, `CREATE DATABASE foo; CREATE TABLE foo.bar(i int primary key); INSERT INTO foo.bar VALUES (110), (210)`) + // Prevent a logging assertion that the server ID is initialized multiple times. + log.TestingClearServerIdentifiers() + _, conn11 := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(11)}) defer conn11.Close() tenant11 := sqlutils.MakeSQLRunner(conn11) tenant11.Exec(t, `CREATE DATABASE foo; CREATE TABLE foo.baz(i int primary key); INSERT INTO foo.baz VALUES (111), (211)`) + log.TestingClearServerIdentifiers() + _, conn20 := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(20)}) defer conn20.Close() tenant20 := sqlutils.MakeSQLRunner(conn20) @@ -6268,6 +6277,8 @@ func TestBackupRestoreTenant(t *testing.T) { [][]string{{`10`, `true`, `{"id": "10", "state": "ACTIVE"}`}}, ) + log.TestingClearServerIdentifiers() + ten10Stopper := stop.NewStopper() _, restoreConn10 := serverutils.StartTenant( t, restoreTC.Server(0), base.TestTenantArgs{ @@ -6302,6 +6313,8 @@ func TestBackupRestoreTenant(t *testing.T) { [][]string{{`10`, `true`, `{"id": "10", "state": "ACTIVE"}`}}, ) + log.TestingClearServerIdentifiers() + _, restoreConn10 = serverutils.StartTenant( t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10), Existing: true}, ) @@ -6326,6 +6339,8 @@ func TestBackupRestoreTenant(t *testing.T) { [][]string{{`10`, `true`, `{"id": "10", "state": "ACTIVE"}`}}, ) + log.TestingClearServerIdentifiers() + _, restoreConn10 := serverutils.StartTenant( t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10), Existing: true}, ) @@ -6355,6 +6370,8 @@ func TestBackupRestoreTenant(t *testing.T) { }, ) + log.TestingClearServerIdentifiers() + _, restoreConn10 := serverutils.StartTenant( t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10), Existing: true}, ) @@ -6364,6 +6381,8 @@ func TestBackupRestoreTenant(t *testing.T) { restoreTenant10.CheckQueryResults(t, `select * from foo.bar`, tenant10.QueryStr(t, `select * from foo.bar`)) restoreTenant10.CheckQueryResults(t, `select * from foo.bar2`, tenant10.QueryStr(t, `select * from foo.bar2`)) + log.TestingClearServerIdentifiers() + _, restoreConn11 := serverutils.StartTenant( t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(11), Existing: true}, ) @@ -6382,6 +6401,8 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10' AS OF SYSTEM TIME `+ts1) + log.TestingClearServerIdentifiers() + _, restoreConn10 := serverutils.StartTenant( t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10), Existing: true}, ) @@ -6400,6 +6421,8 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.Exec(t, `RESTORE TENANT 20 FROM 'nodelocal://1/t20'`) + log.TestingClearServerIdentifiers() + _, restoreConn20 := serverutils.StartTenant( t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(20), Existing: true}, ) diff --git a/pkg/ccl/backupccl/create_scheduled_backup_test.go b/pkg/ccl/backupccl/create_scheduled_backup_test.go index 902b4834eec4..be8ebf0907aa 100644 --- a/pkg/ccl/backupccl/create_scheduled_backup_test.go +++ b/pkg/ccl/backupccl/create_scheduled_backup_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" @@ -178,6 +179,8 @@ func (t userType) String() string { // itself with the actual scheduling and the execution of those backups. func TestSerializesScheduledBackupExecutionArgs(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -435,6 +438,8 @@ func TestSerializesScheduledBackupExecutionArgs(t *testing.T) { func TestScheduleBackup(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -578,6 +583,8 @@ INSERT INTO t1 values (-1), (10), (-100); func TestCreateBackupScheduleRequiresAdminRole(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -599,6 +606,8 @@ func TestCreateBackupScheduleRequiresAdminRole(t *testing.T) { func TestCreateBackupScheduleCollectionOverwrite(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -619,6 +628,8 @@ func TestCreateBackupScheduleCollectionOverwrite(t *testing.T) { func TestCreateBackupScheduleInExplicitTxnRollback(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -639,6 +650,8 @@ func TestCreateBackupScheduleInExplicitTxnRollback(t *testing.T) { // (eventually), even after the cluster has been down for a long period. func TestScheduleBackupRecoversFromClusterDown(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index ef76fbb2aa78..3e4d9fac206a 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -6258,6 +6258,7 @@ func TestDisallowsInvalidFormatOptions(t *testing.T) { func TestImportInTenant(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() baseDir := filepath.Join("testdata") @@ -6272,6 +6273,9 @@ func TestImportInTenant(t *testing.T) { defer conn10.Close() t10 := sqlutils.MakeSQLRunner(conn10) + // Prevent a logging assertion that the server ID is initialized multiple times. + log.TestingClearServerIdentifiers() + // Setup a few tenants, each with a different table. _, conn11 := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(11)}) defer conn11.Close() diff --git a/pkg/cli/interactive_tests/test_log_config_msg.tcl b/pkg/cli/interactive_tests/test_log_config_msg.tcl index 6b0a89d47e82..8c6b62017c17 100644 --- a/pkg/cli/interactive_tests/test_log_config_msg.tcl +++ b/pkg/cli/interactive_tests/test_log_config_msg.tcl @@ -7,9 +7,10 @@ source [file join [file dirname $argv0] common.tcl] start_server $argv -start_test "Check that the cluster ID is reported at the start of the first log file." +start_test "Check that the cluster and node ID is reported at the start of the first log file." spawn tail -n 1000 -F logs/db/logs/cockroach.log eexpect "\\\[config\\\] clusterID:" +eexpect "\\\[config\\\] nodeID:" eexpect "node startup completed" end_test @@ -22,7 +23,7 @@ system "$argv start-single-node --insecure --pid-file=server_pid --background -s # Stop the server, which also flushes and closes the log files. stop_server $argv -start_test "Check that the cluster ID is reported at the start of new log files." +start_test "Check that the cluster and node ID is reported at the start of new log files." # Verify that the string "restarted pre-existing node" can be found # somewhere. This ensures that if this string ever changes, the test # below won't report a false negative. @@ -32,4 +33,6 @@ system "grep -q 'restarted pre-existing node' logs/db/logs/*.log" system "if grep -q 'restarted pre-existing node' logs/db/logs/cockroach.log; then false; fi" # Verify that the last log file does contain the cluster ID. system "grep -qF '\[config\] clusterID:' logs/db/logs/cockroach.log" +system "grep -qF '\[config\] nodeID:' logs/db/logs/cockroach.log" end_test + diff --git a/pkg/cli/mt_start_sql.go b/pkg/cli/mt_start_sql.go index 8d7849ac459b..524b1ce1c8a0 100644 --- a/pkg/cli/mt_start_sql.go +++ b/pkg/cli/mt_start_sql.go @@ -98,7 +98,7 @@ func runStartSQL(cmd *cobra.Command, args []string) error { tempStorageMaxSizeBytes, ) - sqlServer, addr, httpAddr, instanceID, err := server.StartTenant( + sqlServer, addr, httpAddr, err := server.StartTenant( ctx, stopper, clusterName, @@ -116,11 +116,6 @@ func runStartSQL(cmd *cobra.Command, args []string) error { sqlServer.StartDiagnostics(ctx) } - // Register the server's identifiers so that log events are - // decorated with the server's identity. This helps when gathering - // log events from multiple servers into the same log collector. - log.SetTenantIDs(serverCfg.SQLConfig.TenantID.String(), int32(instanceID)) - log.Infof(ctx, "SQL server for tenant %s listening at %s, http at %s", serverCfg.SQLConfig.TenantID, addr, httpAddr) // TODO(tbg): make the other goodies in `./cockroach start` reusable, such as diff --git a/pkg/cmd/roachtest/multitenant.go b/pkg/cmd/roachtest/multitenant.go index 0bd741a9ccdf..3896848863cf 100644 --- a/pkg/cmd/roachtest/multitenant.go +++ b/pkg/cmd/roachtest/multitenant.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -29,9 +30,12 @@ func runAcceptanceMultitenant(ctx context.Context, t *test, c *cluster) { require.NoError(t, err) kvAddrs := c.ExternalAddr(ctx, c.All()) - errCh := make(chan error) + + tenantCtx, cancel := context.WithCancel(ctx) + defer cancel() + errCh := make(chan error, 1) go func() { - errCh <- c.RunE(ctx, c.Node(1), + errCh <- c.RunE(tenantCtx, c.Node(1), "./cockroach", "mt", "start-sql", // TODO(tbg): make this test secure. // "--certs-dir", "certs", @@ -41,7 +45,10 @@ func runAcceptanceMultitenant(ctx context.Context, t *test, c *cluster) { "--kv-addrs", strings.Join(kvAddrs, ","), // Don't bind to external interfaces when running locally. "--sql-addr", ifLocal("127.0.0.1", "0.0.0.0")+":36257", + // Ensure that log files get created. + "--log='file-defaults: {dir: .}'", ) + close(errCh) }() u, err := url.Parse(c.ExternalPGUrl(ctx, c.Node(1))[0]) require.NoError(t, err) @@ -57,6 +64,8 @@ func runAcceptanceMultitenant(ctx context.Context, t *test, c *cluster) { default: } + t.Status("checking that a client can connect to the tenant server") + db, err := gosql.Open("postgres", url) if err != nil { t.Fatal(err) @@ -73,4 +82,26 @@ func runAcceptanceMultitenant(ctx context.Context, t *test, c *cluster) { require.NoError(t, db.QueryRow(`SELECT * FROM foo LIMIT 1`).Scan(&id, &v)) require.Equal(t, 1, id) require.Equal(t, "bar", v) + + t.Status("stopping the server ahead of checking for the tenant server") + + // Stop the server, which also ensures that log files get flushed. + cancel() + <-errCh + + t.Status("checking log file contents") + + // Check that the server identifiers are present in the tenant log file. + if err := c.RunE(ctx, c.Node(1), + "grep", "-q", "'\\[config\\] clusterID:'", "cockroach.log"); err != nil { + t.Fatal(errors.Wrap(err, "cluster ID not found in log file")) + } + if err := c.RunE(ctx, c.Node(1), + "grep", "-q", "'\\[config\\] tenantID:'", "cockroach.log"); err != nil { + t.Fatal(errors.Wrap(err, "tenant ID not found in log file")) + } + if err := c.RunE(ctx, c.Node(1), + "grep", "-q", "'\\[config\\] instanceID:'", "cockroach.log"); err != nil { + t.Fatal(errors.Wrap(err, "SQL instance ID not found in log file")) + } } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 3d1e679c089a..e08c5c81918d 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -57,6 +57,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -645,7 +646,7 @@ func (ts *TestServer) StartTenant( if stopper == nil { stopper = ts.Stopper() } - sqlServer, addr, httpAddr, _, err := StartTenant( + sqlServer, addr, httpAddr, err := StartTenant( ctx, stopper, ts.Cfg.ClusterName, @@ -662,14 +663,14 @@ func StartTenant( kvClusterName string, // NB: gone after https://github.com/cockroachdb/cockroach/issues/42519 baseCfg BaseConfig, sqlCfg SQLConfig, -) (sqlServer *SQLServer, pgAddr string, httpAddr string, instanceID base.SQLInstanceID, _ error) { +) (sqlServer *SQLServer, pgAddr string, httpAddr string, _ error) { args, err := makeSQLServerArgs(stopper, kvClusterName, baseCfg, sqlCfg) if err != nil { - return nil, "", "", 0, err + return nil, "", "", err } s, err := newSQLServer(ctx, args) if err != nil { - return nil, "", "", 0, err + return nil, "", "", err } // TODO(asubiotto): remove this. Right now it is needed to initialize the @@ -686,7 +687,7 @@ func StartTenant( pgL, err := listen(ctx, &args.Config.SQLAddr, &args.Config.SQLAdvertiseAddr, "sql") if err != nil { - return nil, "", "", 0, err + return nil, "", "", err } args.stopper.RunWorker(ctx, func(ctx context.Context) { @@ -700,7 +701,7 @@ func StartTenant( httpL, err := listen(ctx, &args.Config.HTTPAddr, &args.Config.HTTPAdvertiseAddr, "http") if err != nil { - return nil, "", "", 0, err + return nil, "", "", err } args.stopper.RunWorker(ctx, func(ctx context.Context) { @@ -750,7 +751,7 @@ func StartTenant( heapProfileDirName: args.HeapProfileDirName, runtime: args.runtime, }); err != nil { - return nil, "", "", 0, err + return nil, "", "", err } s.execCfg.DistSQLPlanner.SetNodeInfo(roachpb.NodeDescriptor{NodeID: roachpb.NodeID(args.nodeIDContainer.SQLInstanceID())}) @@ -763,18 +764,27 @@ func StartTenant( socketFile, orphanedLeasesTimeThresholdNanos, ); err != nil { - return nil, "", "", 0, err + return nil, "", "", err } + // Register the server's identifiers so that log events are + // decorated with the server's identity. This helps when gathering + // log events from multiple servers into the same log collector. + // + // We do this only here, as the identifiers may not be known before this point. + clusterID := args.rpcContext.ClusterID.Get().String() + log.SetNodeIDs(clusterID, 0 /* nodeID is not known for a SQL-only server. */) + log.SetTenantIDs(args.TenantID.String(), int32(s.SQLInstanceID())) + if err := s.startServeSQL(ctx, args.stopper, s.connManager, s.pgL, socketFile); err != nil { - return nil, "", "", 0, err + return nil, "", "", err } - return s, pgLAddr, httpLAddr, args.nodeIDContainer.SQLInstanceID(), nil + return s, pgLAddr, httpLAddr, nil } // ExpectedInitialRangeCount returns the expected number of ranges that should diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 28ccd33a26ab..5fffaa4d6b4d 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1426,6 +1426,10 @@ func (t *logicTest) newCluster(serverArgs TestServerArgs) { }, }, } + + // Prevent a logging assertion that the server ID is initialized multiple times. + log.TestingClearServerIdentifiers() + tenant, err := t.cluster.Server(t.nodeIdx).StartTenant(tenantArgs) if err != nil { t.rootT.Fatalf("%+v", err) @@ -3245,9 +3249,14 @@ func RunLogicTestWithDefaultConfig( // - we're generating testfiles, or // - we are in race mode (where we can hit a limit on alive // goroutines). - if !*showSQL && !*rewriteResultsInTestfiles && !*rewriteSQL && !util.RaceEnabled { + if !*showSQL && !*rewriteResultsInTestfiles && !*rewriteSQL && !util.RaceEnabled && !cfg.useTenant { // Skip parallelizing tests that use the kv-batch-size directive since // the batch size is a global variable. + // + // We also cannot parallelise tests that use tenant servers + // because they change shared state in the logging configuration + // and there is an assertion against conflicting changes. + // // TODO(jordan, radu): make sqlbase.kvBatchSize non-global to fix this. if filepath.Base(path) != "select_index_span_ranges" { t.Parallel() // SAFE FOR TESTING (this comments satisfies the linter) diff --git a/pkg/sql/telemetry_test.go b/pkg/sql/telemetry_test.go index 5dbe119fb97b..a04d7f18b1e9 100644 --- a/pkg/sql/telemetry_test.go +++ b/pkg/sql/telemetry_test.go @@ -145,6 +145,9 @@ func (tt *telemetryTest) Start(t *testing.T) { tt.server, tt.serverDB, _ = serverutils.StartServer(tt.t, params) tt.prepareCluster(tt.serverDB) + // Prevent a logging assertion that the server ID is initialized multiple times. + log.TestingClearServerIdentifiers() + tt.tenant, tt.tenantDB = serverutils.StartTenant(tt.t, tt.server, base.TestTenantArgs{ TenantID: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[0]), AllowSettingClusterSettings: true, diff --git a/pkg/util/log/clog.go b/pkg/util/log/clog.go index ce4ed0c6f75a..4a326669272d 100644 --- a/pkg/util/log/clog.go +++ b/pkg/util/log/clog.go @@ -216,7 +216,9 @@ func SetNodeIDs(clusterID string, nodeID int32) { // will always find it. ctx := logtags.AddTag(context.Background(), "config", nil) logfDepth(ctx, 1, severity.INFO, channel.OPS, "clusterID: %s", clusterID) - logfDepth(ctx, 1, severity.INFO, channel.OPS, "nodeID: n%s", nodeID) + if nodeID != 0 { + logfDepth(ctx, 1, severity.INFO, channel.OPS, "nodeID: n%d", nodeID) + } // Perform the change proper. logging.idMu.Lock() diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 24b80f906114..351fa64808a9 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -332,6 +332,16 @@ func (l *sinkInfo) describeAppliedConfig() (c logconfig.CommonSinkConfig) { return c } +// TestingClearServerIdentifiers clears the server identity from the +// logging system. This is for use in tests that start multiple +// servers with conflicting identities subsequently. +// See discussion here: https://github.com/cockroachdb/cockroach/issues/58938 +func TestingClearServerIdentifiers() { + logging.idMu.Lock() + logging.idMu.idPayload = idPayload{} + logging.idMu.Unlock() +} + // TestingResetActive clears the active bit. This is for use in tests // that use stderr redirection alongside other tests that use // logging. diff --git a/pkg/util/log/test_log_scope.go b/pkg/util/log/test_log_scope.go index 2066922b3d17..9a3958910929 100644 --- a/pkg/util/log/test_log_scope.go +++ b/pkg/util/log/test_log_scope.go @@ -160,6 +160,10 @@ func ScopeWithoutShowLogs(t tShim) (sc *TestLogScope) { t.Fatal(err) } + // Reset the server identifiers, so that new servers + // can report their IDs through logging. + TestingClearServerIdentifiers() + // Switch to the new configuration. TestingResetActive() sc.cleanupFn, err = ApplyConfig(cfg) From fe89c58368875027187321665554e80a34f1f2dc Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 16 Dec 2020 16:51:16 -0500 Subject: [PATCH 2/3] roachprod: Support AWS GP3 drives. Add support for spinning up VMs with GP3 drives. Make it possible to specify multiple, possibly different EBS volumes for each aws instance. Release Notes: None --- pkg/cmd/roachprod/vm/aws/aws.go | 168 ++++++++++++++++++++++++++------ 1 file changed, 140 insertions(+), 28 deletions(-) diff --git a/pkg/cmd/roachprod/vm/aws/aws.go b/pkg/cmd/roachprod/vm/aws/aws.go index dcf5fdb373f8..70290a8edd2d 100644 --- a/pkg/cmd/roachprod/vm/aws/aws.go +++ b/pkg/cmd/roachprod/vm/aws/aws.go @@ -13,6 +13,7 @@ package aws import ( "encoding/json" "fmt" + "io/ioutil" "log" "math/rand" "os" @@ -65,19 +66,115 @@ func init() { vm.Providers[ProviderName] = p } +// ebsDisk represent EBS disk device. +// When marshaled to JSON format, produces JSON specification used +// by AWS sdk to configure attached volumes. +type ebsDisk struct { + VolumeType string `json:"VolumeType"` + VolumeSize int `json:"VolumeSize"` + IOPs int `json:"Iops,omitempty"` + Throughput int `json:"Throughput,omitempty"` + DeleteOnTermination bool `json:"DeleteOnTermination"` +} + +// ebsVolume represents a mounted volume: name + ebsDisk +type ebsVolume struct { + DeviceName string `json:"DeviceName"` + Disk ebsDisk `json:"Ebs"` +} + +const ebsDefaultVolumeSizeGB = 500 + +// Set implements flag Value interface. +func (d *ebsDisk) Set(s string) error { + if err := json.Unmarshal([]byte(s), &d); err != nil { + return err + } + + d.DeleteOnTermination = true + + // Sanity check disk configuration. + // This is not strictly needed since AWS sdk would return error anyway, + // but we can return a nicer error message sooner. + if d.VolumeSize == 0 { + d.VolumeSize = ebsDefaultVolumeSizeGB + } + + switch strings.ToLower(d.VolumeType) { + case "gp2": + // Nothing -- size checked above. + case "gp3": + if d.IOPs > 16000 { + return errors.AssertionFailedf("Iops required for gp3 disk: [3000, 16000]") + } + if d.IOPs == 0 { + // 30000 is a base IOPs for gp3. + d.IOPs = 3000 + } + if d.Throughput == 0 { + // 125MB/s is base throughput for gp3. + d.Throughput = 125 + } + case "io1", "io2": + if d.IOPs == 0 { + return errors.AssertionFailedf("Iops required for %s disk", d.VolumeType) + } + default: + return errors.Errorf("Unknown EBS volume type %s", d.VolumeType) + } + return nil +} + +// Type implements flag Value interface. +func (d *ebsDisk) Type() string { + return "JSON" +} + +// String Implements flag Value interface. +func (d *ebsDisk) String() string { + return "EBSDisk" +} + +type ebsVolumeList []*ebsVolume + +func (vl *ebsVolumeList) newVolume() *ebsVolume { + return &ebsVolume{ + DeviceName: fmt.Sprintf("/dev/sd%c", 'd'+len(*vl)), + } +} + +// Set implements flag Value interface. +func (vl *ebsVolumeList) Set(s string) error { + v := vl.newVolume() + if err := v.Disk.Set(s); err != nil { + return err + } + *vl = append(*vl, v) + return nil +} + +// Type implements flag Value interface. +func (vl *ebsVolumeList) Type() string { + return "JSON" +} + +// String Implements flag Value interface. +func (vl *ebsVolumeList) String() string { + return "EBSVolumeList" +} + // providerOpts implements the vm.ProviderFlags interface for aws.Provider. type providerOpts struct { Profile string Config *awsConfig - MachineType string - SSDMachineType string - CPUOptions string - RemoteUserName string - EBSVolumeType string - EBSVolumeSize int - EBSProvisionedIOPs int - UseMultipleDisks bool + MachineType string + SSDMachineType string + CPUOptions string + RemoteUserName string + DefaultEBSVolume ebsVolume + EBSVolumes ebsVolumeList + UseMultipleDisks bool // Use specified ImageAMI when provisioning. // Overrides config.json AMI. @@ -122,7 +219,6 @@ var defaultCreateZones = []string{ // somewhat complicated because different EC2 regions may as well // be parallel universes. func (o *providerOpts) ConfigureCreateFlags(flags *pflag.FlagSet) { - // m5.xlarge is a 4core, 16Gb instance, approximately equal to a GCE n1-standard-4 flags.StringVar(&o.MachineType, ProviderName+"-machine-type", defaultMachineType, "Machine type (see https://aws.amazon.com/ec2/instance-types/)") @@ -139,13 +235,17 @@ func (o *providerOpts) ConfigureCreateFlags(flags *pflag.FlagSet) { flags.StringVar(&o.RemoteUserName, ProviderName+"-user", "ubuntu", "Name of the remote user to SSH as") - flags.StringVar(&o.EBSVolumeType, ProviderName+"-ebs-volume-type", - "gp2", "Type of the EBS volume, only used if local-ssd=false") - flags.IntVar(&o.EBSVolumeSize, ProviderName+"-ebs-volume-size", - 500, "Size in GB of EBS volume, only used if local-ssd=false") - flags.IntVar(&o.EBSProvisionedIOPs, ProviderName+"-ebs-iops", - 1000, "Number of IOPs to provision, only used if "+ProviderName+ - "-ebs-volume-type=io1") + flags.StringVar(&o.DefaultEBSVolume.Disk.VolumeType, ProviderName+"-ebs-volume-type", + "", "Type of the EBS volume, only used if local-ssd=false") + flags.IntVar(&o.DefaultEBSVolume.Disk.VolumeSize, ProviderName+"-ebs-volume-size", + ebsDefaultVolumeSizeGB, "Size in GB of EBS volume, only used if local-ssd=false") + flags.IntVar(&o.DefaultEBSVolume.Disk.IOPs, ProviderName+"-ebs-iops", + 0, "Number of IOPs to provision for supported disk types (io1, io2, gp3)") + flags.IntVar(&o.DefaultEBSVolume.Disk.Throughput, ProviderName+"-ebs-throughput", + 0, "Additional throughput to provision, in MiB/s") + + flags.VarP(&o.EBSVolumes, ProviderName+"-ebs-volume", "", + "Additional EBS disk to attached; specified as JSON: {VolumeType=io2,VolumeSize=213,Iops=321}") flags.StringSliceVar(&o.CreateZones, ProviderName+"-zones", nil, fmt.Sprintf("aws availability zones to use for cluster creation. If zones are formatted\n"+ @@ -723,21 +823,33 @@ func (p *Provider) runInstance(name string, zone string, opts vm.CreateOpts) err // The local NVMe devices are automatically mapped. Otherwise, we need to map an EBS data volume. if !opts.SSDOpts.UseLocalSSD { - var ebsParams string - switch t := p.opts.EBSVolumeType; t { - case "gp2": - ebsParams = fmt.Sprintf("{VolumeSize=%d,VolumeType=%s,DeleteOnTermination=true}", - p.opts.EBSVolumeSize, t) - case "io1", "io2": - ebsParams = fmt.Sprintf("{VolumeSize=%d,VolumeType=%s,Iops=%d,DeleteOnTermination=true}", - p.opts.EBSVolumeSize, t, p.opts.EBSProvisionedIOPs) - default: - return errors.Errorf("Unknown EBS volume type %s", t) + if len(p.opts.EBSVolumes) == 0 && p.opts.DefaultEBSVolume.Disk.VolumeType == "" { + p.opts.DefaultEBSVolume.Disk.VolumeType = "gp2" + } + + if p.opts.DefaultEBSVolume.Disk.VolumeType != "" { + // Add default volume to the list of volumes we'll setup. + v := p.opts.EBSVolumes.newVolume() + v.Disk = p.opts.DefaultEBSVolume.Disk + p.opts.EBSVolumes = append(p.opts.EBSVolumes, v) + } + + mapping, err := json.Marshal(p.opts.EBSVolumes) + if err != nil { + return err + } + + deviceMapping, err := ioutil.TempFile("", "aws-block-device-mapping") + if err != nil { + return err + } + defer deviceMapping.Close() + if _, err := deviceMapping.Write(mapping); err != nil { + return err } args = append(args, "--block-device-mapping", - // Size is measured in GB. gp2 type derives guaranteed iops from size. - "DeviceName=/dev/sdd,Ebs="+ebsParams, + "file://"+deviceMapping.Name(), ) } return p.runJSONCommand(args, &data) From 3457bf76fba45c384c71a52b1b5047ff3e8dc122 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 15 Jan 2021 19:10:02 +0100 Subject: [PATCH 3/3] util/log: log tenant and instance ID on the OPS channel. Release note: None --- pkg/util/log/clog.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/log/clog.go b/pkg/util/log/clog.go index 4a326669272d..35f17f431c12 100644 --- a/pkg/util/log/clog.go +++ b/pkg/util/log/clog.go @@ -238,8 +238,8 @@ func SetTenantIDs(tenantID string, sqlInstanceID int32) { // new log files, even on the first log file. This ensures that grep // will always find it. ctx := logtags.AddTag(context.Background(), "config", nil) - logfDepth(ctx, 1, severity.INFO, channel.DEV, "tenantID: %s", tenantID) // TODO(knz): Use OPS here. - logfDepth(ctx, 1, severity.INFO, channel.DEV, "instanceID: %d", sqlInstanceID) // TODO(knz): Use OPS here. + logfDepth(ctx, 1, severity.INFO, channel.OPS, "tenantID: %s", tenantID) + logfDepth(ctx, 1, severity.INFO, channel.OPS, "instanceID: %d", sqlInstanceID) // Perform the change proper. logging.idMu.Lock()