Skip to content

Commit

Permalink
kvserver/rangelog: switch implementation to use raw KVs
Browse files Browse the repository at this point in the history
This commit swaps out the old internal executor based implementation with one
which hard-codes its understanding of the rangelog table and uses the KV API
directly.

Release note: None
  • Loading branch information
ajwerner committed Oct 11, 2022
1 parent 07e92b8 commit 285e414
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 67 deletions.
6 changes: 3 additions & 3 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ exp,benchmark
12,AlterTableDropConstraint/alter_table_drop_1_check_constraint
12,AlterTableDropConstraint/alter_table_drop_2_check_constraints
12,AlterTableDropConstraint/alter_table_drop_3_check_constraints
10,AlterTableSplit/alter_table_split_at_1_value
15,AlterTableSplit/alter_table_split_at_2_values
20,AlterTableSplit/alter_table_split_at_3_values
9,AlterTableSplit/alter_table_split_at_1_value
13,AlterTableSplit/alter_table_split_at_2_values
17,AlterTableSplit/alter_table_split_at_3_values
7,AlterTableUnsplit/alter_table_unsplit_at_1_value
9,AlterTableUnsplit/alter_table_unsplit_at_2_values
11,AlterTableUnsplit/alter_table_unsplit_at_3_values
Expand Down
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/rangelog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangelog",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/security/username",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/sql/catalog",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -20,6 +22,7 @@ go_test(
name = "rangelog_test",
srcs = [
"helpers_test.go",
"internal_executor_writer_test.go",
"main_test.go",
"rangelog_test.go",
],
Expand All @@ -30,14 +33,19 @@ go_test(
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/rangelog/internal/rangelogtestpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand All @@ -46,6 +54,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Expand Down
86 changes: 86 additions & 0 deletions pkg/kv/kvserver/rangelog/internal_executor_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangelog

import (
"context"
"encoding/json"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)

// InternalExecutorWriter implements kvserver.RangeLogWriter
// using the InternalExecutor.
type InternalExecutorWriter struct {
generateUniqueID func() int64
ie sqlutil.InternalExecutor
insertQuery string
}

// NewInternalExecutorWriter returns a new InternalExecutorWriter which
// implements kvserver.RangeLogWriter using the InternalExecutor.
func NewInternalExecutorWriter(
generateUniqueID func() int64, ie sqlutil.InternalExecutor, tableName string,
) *InternalExecutorWriter {
return &InternalExecutorWriter{
generateUniqueID: generateUniqueID,
ie: ie,
insertQuery: fmt.Sprintf(`
INSERT INTO %s (
timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info, "uniqueID"
)
VALUES(
$1, $2, $3, $4, $5, $6, $7
)
`, tableName),
}
}

func (s *InternalExecutorWriter) WriteRangeLogEvent(
ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent,
) error {
args := []interface{}{
event.Timestamp,
event.RangeID,
event.StoreID,
event.EventType.String(),
nil, // otherRangeID
nil, // info
s.generateUniqueID(),
}
if event.OtherRangeID != 0 {
args[4] = event.OtherRangeID
}
if event.Info != nil {
infoBytes, err := json.Marshal(*event.Info)
if err != nil {
return err
}
args[5] = string(infoBytes)
}

rows, err := s.ie.ExecEx(ctx, "log-range-event", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
s.insertQuery, args...)
if err != nil {
return err
}
if rows != 1 {
return errors.Errorf("%d rows affected by log insertion; expected exactly one row affected.", rows)
}
return nil
}
99 changes: 57 additions & 42 deletions pkg/kv/kvserver/rangelog/rangelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,44 @@ package rangelog
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)

// Writer implements kvserver.RangeLogWriter using the InternalExecutor.
type Writer struct {
generateUniqueID func() int64
ie sqlutil.InternalExecutor
insertQuery string
generateUniqueID IDGen
codec keys.SQLCodec
tableDesc catalog.TableDescriptor
primaryIndex catalog.Index
tableColMap catalog.TableColMap
}

// IDGen is used to generate a unique ID for new rows.
type IDGen = func() int64

// NewWriter returns a new Writer which implements kvserver.RangeLogWriter
// using the InternalExecutor.
func NewWriter(generateUniqueID func() int64, ie sqlutil.InternalExecutor) *Writer {
return newWriter(generateUniqueID, ie, "system.rangelog")
// using just kv APIs. The IDGen function must return unique identifiers
// every time it is called.
func NewWriter(codec keys.SQLCodec, generateUniqueID IDGen) *Writer {
return newWriter(codec, generateUniqueID, systemschema.RangeEventTable)
}

func newWriter(
generateUniqueID func() int64, ie sqlutil.InternalExecutor, tableName string,
) *Writer {
func newWriter(codec keys.SQLCodec, id IDGen, table catalog.TableDescriptor) *Writer {
return &Writer{
generateUniqueID: generateUniqueID,
ie: ie,
insertQuery: fmt.Sprintf(`
INSERT INTO %s (
timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info, "uniqueID"
)
VALUES(
$1, $2, $3, $4, $5, $6, $7
)
`, tableName),
generateUniqueID: id,
codec: codec,
tableDesc: table,
primaryIndex: table.GetPrimaryIndex(),
tableColMap: catalog.ColumnIDToOrdinalMap(table.PublicColumns()),
}
}

Expand All @@ -59,34 +60,48 @@ func newWriter(
func (s *Writer) WriteRangeLogEvent(
ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent,
) error {
args := []interface{}{
event.Timestamp,
event.RangeID,
event.StoreID,
event.EventType.String(),
nil, // otherRangeID
nil, // info
s.generateUniqueID(),
ts, err := tree.MakeDTimestampTZ(event.Timestamp, time.Microsecond)
if err != nil {
return errors.AssertionFailedf("failed to generate event timestamp"+
"from go time: %v", ts)
}
args := [...]tree.Datum{
ts,
tree.NewDInt(tree.DInt(event.RangeID)),
tree.NewDInt(tree.DInt(event.StoreID)),
tree.NewDString(event.EventType.String()),
tree.DNull,
tree.DNull,
tree.NewDInt(tree.DInt(s.generateUniqueID())),
}
if event.OtherRangeID != 0 {
args[4] = event.OtherRangeID
args[4] = tree.NewDInt(tree.DInt(event.OtherRangeID))
}
if event.Info != nil {
infoBytes, err := json.Marshal(*event.Info)
if err != nil {
return err
return errors.NewAssertionErrorWithWrappedErrf(
err, "failed to encode rangelog event info",
)
}
args[5] = string(infoBytes)
args[5] = tree.NewDString(string(infoBytes))
}

rows, err := s.ie.ExecEx(ctx, "log-range-event", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
s.insertQuery, args...)
entries, err := rowenc.EncodePrimaryIndex(
s.codec,
s.tableDesc,
s.primaryIndex,
s.tableColMap,
args[:],
false, // includeEmpty
)
if err != nil {
return err
return errors.NewAssertionErrorWithWrappedErrf(
err, "failed to encode rangelog index entries",
)
}
if rows != 1 {
return errors.Errorf("%d rows affected by log insertion; expected exactly one row affected.", rows)
ba := txn.NewBatch()
for i := range entries {
ba.Put(entries[i].Key, &entries[i].Value)
}
return nil
return txn.Run(ctx, ba)
}
Loading

0 comments on commit 285e414

Please sign in to comment.