diff --git a/pkg/bench/rttanalysis/testdata/benchmark_expectations b/pkg/bench/rttanalysis/testdata/benchmark_expectations index 9c448964a81a..a2344bb51aee 100644 --- a/pkg/bench/rttanalysis/testdata/benchmark_expectations +++ b/pkg/bench/rttanalysis/testdata/benchmark_expectations @@ -21,9 +21,9 @@ exp,benchmark 12,AlterTableDropConstraint/alter_table_drop_1_check_constraint 12,AlterTableDropConstraint/alter_table_drop_2_check_constraints 12,AlterTableDropConstraint/alter_table_drop_3_check_constraints -10,AlterTableSplit/alter_table_split_at_1_value -15,AlterTableSplit/alter_table_split_at_2_values -20,AlterTableSplit/alter_table_split_at_3_values +9,AlterTableSplit/alter_table_split_at_1_value +13,AlterTableSplit/alter_table_split_at_2_values +17,AlterTableSplit/alter_table_split_at_3_values 7,AlterTableUnsplit/alter_table_unsplit_at_1_value 9,AlterTableUnsplit/alter_table_unsplit_at_2_values 11,AlterTableUnsplit/alter_table_unsplit_at_3_values diff --git a/pkg/kv/kvserver/rangelog/BUILD.bazel b/pkg/kv/kvserver/rangelog/BUILD.bazel index dc38e80027dd..22ce5f68d888 100644 --- a/pkg/kv/kvserver/rangelog/BUILD.bazel +++ b/pkg/kv/kvserver/rangelog/BUILD.bazel @@ -7,11 +7,13 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangelog", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/kvserverpb", - "//pkg/security/username", - "//pkg/sql/sessiondata", - "//pkg/sql/sqlutil", + "//pkg/sql/catalog", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", "@com_github_cockroachdb_errors//:errors", ], ) @@ -20,6 +22,7 @@ go_test( name = "rangelog_test", srcs = [ "helpers_test.go", + "internal_executor_writer_test.go", "main_test.go", "rangelog_test.go", ], @@ -30,14 +33,19 @@ go_test( deps = [ "//pkg/base", "//pkg/kv", + "//pkg/kv/kvserver", + "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/rangelog/internal/rangelogtestpb", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", @@ -46,6 +54,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/protoutil", "//pkg/util/randutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/rangelog/internal_executor_writer_test.go b/pkg/kv/kvserver/rangelog/internal_executor_writer_test.go new file mode 100644 index 000000000000..ef6cd8bce742 --- /dev/null +++ b/pkg/kv/kvserver/rangelog/internal_executor_writer_test.go @@ -0,0 +1,86 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangelog + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/errors" +) + +// InternalExecutorWriter implements kvserver.RangeLogWriter +// using the InternalExecutor. +type InternalExecutorWriter struct { + generateUniqueID func() int64 + ie sqlutil.InternalExecutor + insertQuery string +} + +// NewInternalExecutorWriter returns a new InternalExecutorWriter which +// implements kvserver.RangeLogWriter using the InternalExecutor. +func NewInternalExecutorWriter( + generateUniqueID func() int64, ie sqlutil.InternalExecutor, tableName string, +) *InternalExecutorWriter { + return &InternalExecutorWriter{ + generateUniqueID: generateUniqueID, + ie: ie, + insertQuery: fmt.Sprintf(` + INSERT INTO %s ( + timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info, "uniqueID" + ) + VALUES( + $1, $2, $3, $4, $5, $6, $7 + ) + `, tableName), + } +} + +func (s *InternalExecutorWriter) WriteRangeLogEvent( + ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent, +) error { + args := []interface{}{ + event.Timestamp, + event.RangeID, + event.StoreID, + event.EventType.String(), + nil, // otherRangeID + nil, // info + s.generateUniqueID(), + } + if event.OtherRangeID != 0 { + args[4] = event.OtherRangeID + } + if event.Info != nil { + infoBytes, err := json.Marshal(*event.Info) + if err != nil { + return err + } + args[5] = string(infoBytes) + } + + rows, err := s.ie.ExecEx(ctx, "log-range-event", txn, + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + s.insertQuery, args...) + if err != nil { + return err + } + if rows != 1 { + return errors.Errorf("%d rows affected by log insertion; expected exactly one row affected.", rows) + } + return nil +} diff --git a/pkg/kv/kvserver/rangelog/rangelog.go b/pkg/kv/kvserver/rangelog/rangelog.go index c65f4e150304..1c3fc6b26c2d 100644 --- a/pkg/kv/kvserver/rangelog/rangelog.go +++ b/pkg/kv/kvserver/rangelog/rangelog.go @@ -14,43 +14,44 @@ package rangelog import ( "context" "encoding/json" - "fmt" + "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) // Writer implements kvserver.RangeLogWriter using the InternalExecutor. type Writer struct { - generateUniqueID func() int64 - ie sqlutil.InternalExecutor - insertQuery string + generateUniqueID IDGen + codec keys.SQLCodec + tableDesc catalog.TableDescriptor + primaryIndex catalog.Index + tableColMap catalog.TableColMap } +// IDGen is used to generate a unique ID for new rows. +type IDGen = func() int64 + // NewWriter returns a new Writer which implements kvserver.RangeLogWriter -// using the InternalExecutor. -func NewWriter(generateUniqueID func() int64, ie sqlutil.InternalExecutor) *Writer { - return newWriter(generateUniqueID, ie, "system.rangelog") +// using just kv APIs. The IDGen function must return unique identifiers +// every time it is called. +func NewWriter(codec keys.SQLCodec, generateUniqueID IDGen) *Writer { + return newWriter(codec, generateUniqueID, systemschema.RangeEventTable) } -func newWriter( - generateUniqueID func() int64, ie sqlutil.InternalExecutor, tableName string, -) *Writer { +func newWriter(codec keys.SQLCodec, id IDGen, table catalog.TableDescriptor) *Writer { return &Writer{ - generateUniqueID: generateUniqueID, - ie: ie, - insertQuery: fmt.Sprintf(` - INSERT INTO %s ( - timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info, "uniqueID" - ) - VALUES( - $1, $2, $3, $4, $5, $6, $7 - ) - `, tableName), + generateUniqueID: id, + codec: codec, + tableDesc: table, + primaryIndex: table.GetPrimaryIndex(), + tableColMap: catalog.ColumnIDToOrdinalMap(table.PublicColumns()), } } @@ -59,34 +60,48 @@ func newWriter( func (s *Writer) WriteRangeLogEvent( ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent, ) error { - args := []interface{}{ - event.Timestamp, - event.RangeID, - event.StoreID, - event.EventType.String(), - nil, // otherRangeID - nil, // info - s.generateUniqueID(), + ts, err := tree.MakeDTimestampTZ(event.Timestamp, time.Microsecond) + if err != nil { + return errors.AssertionFailedf("failed to generate event timestamp"+ + "from go time: %v", ts) + } + args := [...]tree.Datum{ + ts, + tree.NewDInt(tree.DInt(event.RangeID)), + tree.NewDInt(tree.DInt(event.StoreID)), + tree.NewDString(event.EventType.String()), + tree.DNull, + tree.DNull, + tree.NewDInt(tree.DInt(s.generateUniqueID())), } if event.OtherRangeID != 0 { - args[4] = event.OtherRangeID + args[4] = tree.NewDInt(tree.DInt(event.OtherRangeID)) } if event.Info != nil { infoBytes, err := json.Marshal(*event.Info) if err != nil { - return err + return errors.NewAssertionErrorWithWrappedErrf( + err, "failed to encode rangelog event info", + ) } - args[5] = string(infoBytes) + args[5] = tree.NewDString(string(infoBytes)) } - - rows, err := s.ie.ExecEx(ctx, "log-range-event", txn, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - s.insertQuery, args...) + entries, err := rowenc.EncodePrimaryIndex( + s.codec, + s.tableDesc, + s.primaryIndex, + s.tableColMap, + args[:], + false, // includeEmpty + ) if err != nil { - return err + return errors.NewAssertionErrorWithWrappedErrf( + err, "failed to encode rangelog index entries", + ) } - if rows != 1 { - return errors.Errorf("%d rows affected by log insertion; expected exactly one row affected.", rows) + ba := txn.NewBatch() + for i := range entries { + ba.Put(entries[i].Key, &entries[i].Value) } - return nil + return txn.Run(ctx, ba) } diff --git a/pkg/kv/kvserver/rangelog/rangelog_test.go b/pkg/kv/kvserver/rangelog/rangelog_test.go index 340f98a89c83..c5325407f398 100644 --- a/pkg/kv/kvserver/rangelog/rangelog_test.go +++ b/pkg/kv/kvserver/rangelog/rangelog_test.go @@ -19,13 +19,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangelog" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangelog/internal/rangelogtestpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -43,7 +45,12 @@ import ( //go:embed testdata/rangelog.bin var encodedRangeLogData []byte -func TestRangeLogRoundTrips(t *testing.T) { +// TestRangeLog tests the RangeLogWriter implementation by ensuring that +// a representative set of events (encoded in a testdata file) can be +// round-tripped through the system, read from sql, and written to the +// key-value store in the same way that a legacy, internal executor-backed +// implementation would. +func TestRangeLog(t *testing.T) { defer leaktest.AfterTest(t)() // We're going to test that the data we have stored as encoded protobuf @@ -58,37 +65,72 @@ func TestRangeLogRoundTrips(t *testing.T) { s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) - // Inject a table to write into. + // Inject two tables to write into. The first table will be written into by + // kv-based implementation. The second table will be written into by the + // internal-executor based implementation. We'll then ensure that the data + // written by the two are the same. tdb := sqlutils.MakeSQLRunner(sqlDB) - tn := tree.MakeTableNameWithSchema("defaultdb", "public", "rangelog") - injectRangelogTable(t, tdb, tn) + tn1 := tree.MakeTableNameWithSchema("defaultdb", "public", "rangelog") + td1 := injectRangelogTable(t, tdb, tn1) + tn2 := tree.MakeTableNameWithSchema("defaultdb", "public", "rangelog2") + td2 := injectRangelogTable(t, tdb, tn2) // Write the data. - ie := s.InternalExecutor().(sqlutil.InternalExecutor) - require.NoError(t, insertRangeLogData(ctx, kvDB, ie, tn, &rangeLogData)) + ec := s.ExecutorConfig().(sql.ExecutorConfig) + codec := ec.Codec + ie := ec.InternalExecutor + mkWriter := func(genID func() int64) kvserver.RangeLogWriter { + genA, genB := makeTeeIDGen(genID) + return &teeWriter{ + a: rangelog.NewTestWriter(codec, genA, td1), + b: rangelog.NewInternalExecutorWriter(genB, ie, tn2.String()), + } + } + require.NoError(t, insertRangeLogData(ctx, kvDB, mkWriter, &rangeLogData)) - // Validate that it round-trips. - got := tdb.QueryStr(t, "SELECT * FROM "+tn.String()) - after, err := rangelogtestpb.ParseRows(got) - require.NoError(t, err) - require.Equal(t, &rangeLogData, after) + // Ensure that the data written to both tables is identical except for the + // key prefix and checksum. + const rawKVsWithoutPrefix = ` +SELECT crdb_internal.pretty_key(key, 1), + substring(encode(val, 'hex') from 9) -- strip the checksum + FROM crdb_internal.scan(crdb_internal.index_span($1, 1)) as t(key, val)` + require.Equal(t, + tdb.QueryStr(t, rawKVsWithoutPrefix, td2.GetID()), + tdb.QueryStr(t, rawKVsWithoutPrefix, td1.GetID())) + + // Validate that the data can be read from SQL. + checkDataRoundTrips := func(tn tree.TableName) { + beforeEncoded, err := protoutil.Marshal(&rangeLogData) + require.NoError(t, err) + got := tdb.QueryStr(t, "SELECT * FROM "+tn.String()) + after, err := rangelogtestpb.ParseRows(got) + require.NoError(t, err) + afterEncoded, err := protoutil.Marshal(after) + require.NoError(t, err) + require.Equal(t, beforeEncoded, afterEncoded) + } + checkDataRoundTrips(tn1) + checkDataRoundTrips(tn2) } -// insertRangeLogData transactionally inserts the provided rangeLogData. +type idGen = rangelog.IDGen + +// insertRangeLogData inserts the provided rangeLogData in a transaction. func insertRangeLogData( ctx context.Context, kvDB *kv.DB, - ie sqlutil.InternalExecutor, - tn tree.TableName, + c func(gen idGen) kvserver.RangeLogWriter, rangeLogData *rangelogtestpb.RangeLogData, ) error { - doInserts := func() (isRestart bool, _ error) { + makeIDGen := func() idGen { var offset int - genID := func() int64 { + return func() int64 { defer func() { offset++ }() return rangeLogData.UniqueIds[offset] } - w := rangelog.NewTestWriter(genID, ie, tn.String()) + } + doInserts := func() (isRestart bool, _ error) { + w := c(makeIDGen()) var called bool errRestart := errors.New("restart") err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -177,3 +219,36 @@ SELECT "parentID", "parentSchemaID", id, crdb_internal_mvcc_timestamp clone.ID, data) return clone.ImmutableCopy().(catalog.TableDescriptor) } + +// makeTeeIDGen takes a function which returns an integer and +// returns two functions, each of which will return the same +// sequence of integers. +func makeTeeIDGen(id idGen) (genA, genB idGen) { + var a, b []int64 + makeGen := func(s *[]int64) func() int64 { + return func() (ret int64) { + if len(*s) == 0 { + v := id() + a, b = append(a, v), append(b, v) + } + ret, (*s) = (*s)[0], (*s)[1:] + return ret + } + } + return makeGen(&a), makeGen(&b) +} + +// teeWriter writes all entries to both a and b. If an error occurs writing to +// a, no write to b is attempted. +type teeWriter struct { + a, b kvserver.RangeLogWriter +} + +func (t teeWriter) WriteRangeLogEvent( + ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent, +) error { + if err := t.a.WriteRangeLogEvent(ctx, txn, event); err != nil { + return err + } + return t.b.WriteRangeLogEvent(ctx, txn, event) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index a7b8b5c63c7e..ca61c8056077 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -639,12 +639,12 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } rangeLogWriter := rangelog.NewWriter( + keys.SystemSQLCodec, func() int64 { return int64(builtins.GenerateUniqueInt( builtins.ProcessUniqueID(nodeIDContainer.Get()), )) }, - internalExecutor, ) storeCfg := kvserver.StoreConfig{ DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(),