Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
93610: upgrades: fix desc ID sequence definition to match expectation r=ajwerner a=ajwerner

Before this change, the test fails with:

```
Diff:
`@@` -29,11 +29,11 `@@`
 	"lastUpdated" TIMESTAMP NOT NULL DEFAULT now():::TIMESTAMP,
 	"valueType" STRING NULL,
 	CONSTRAINT "primary" PRIMARY KEY (name ASC),
 	FAMILY "fam_0_name_value_lastUpdated_valueType" (name, value, "lastUpdated", "valueType")
 );
-CREATE SEQUENCE public.descriptor_id_seq MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 1 START 1;
+CREATE SEQUENCE public.descriptor_id_seq MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 1 START 104;
 CREATE TABLE public.tenants (
 	id INT8 NOT NULL,
 	active BOOL NOT NULL DEFAULT true,
 	info BYTES NULL,
 	name STRING NULL AS (crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo':::STRING, info)->>'name':::STRING) VIRTUAL,
----

```

This was revealed by #93487 which fixed the broken roachtest. Expect a few more of these.
Fixes: #93602

Release note: None

93619: ui: preserve backwards compatibility for managed-service r=matthewtodd a=matthewtodd

These components were renamed in #91860, but the support in managed-service for loading multiple versions of cluster-ui doesn't like it when we do that; it expects changes to be purely additive.

We choose here to provide aliases to the old names, rather than undoing the rename completely, so that we can eventually switch managed-service over to the new names once the old versions have fallen out of the support window.

Epic: none
Release note: None

93622: bulkpb: move backuppb.IngestionPerformanceStats out of ccl r=ajwerner a=ajwerner

This message was relied on by kv code. Nobody used the message at its old path. It's just used for trace events. This PR just moves its code to `kv/bulk/bulkpb` from `ccl/backupccl/backuppb`.

Part of #91714

Epic: none

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Matthew Todd <[email protected]>
  • Loading branch information
3 people committed Dec 14, 2022
4 parents a30fb14 + 691e50b + 9f024e6 + dee89f1 commit 7e0ecb6
Show file tree
Hide file tree
Showing 19 changed files with 424 additions and 308 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,7 @@ GO_TARGETS = [
"//pkg/keys:keys",
"//pkg/keys:keys_test",
"//pkg/keysbase:keysbase",
"//pkg/kv/bulk/bulkpb:bulkpb",
"//pkg/kv/bulk:bulk",
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvbase:kvbase",
Expand Down Expand Up @@ -2482,6 +2483,7 @@ GET_X_DATA_TARGETS = [
"//pkg/keysbase:get_x_data",
"//pkg/kv:get_x_data",
"//pkg/kv/bulk:get_x_data",
"//pkg/kv/bulk/bulkpb:get_x_data",
"//pkg/kv/kvbase:get_x_data",
"//pkg/kv/kvclient:get_x_data",
"//pkg/kv/kvclient/kvcoord:get_x_data",
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/backupccl/backuppb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,13 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/cloud",
"//pkg/roachpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/parser",
"//pkg/sql/protoreflect",
"//pkg/sql/sem/tree",
"//pkg/util/bulk",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//jsonpb",
"@io_opentelemetry_go_otel//attribute",
],
Expand Down
217 changes: 0 additions & 217 deletions pkg/ccl/backupccl/backuppb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,17 @@
package backuppb

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
_ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/gogo/protobuf/jsonpb"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -186,215 +178,6 @@ func (e *ExportStats) Tag() string {
return "ExportStats"
}

// Identity implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Identity() bulk.TracingAggregatorEvent {
stats := IngestionPerformanceStats{}
stats.SendWaitByStore = make(map[roachpb.StoreID]time.Duration)
return &stats
}

// Combine implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) {
otherStats, ok := other.(*IngestionPerformanceStats)
if !ok {
panic(fmt.Sprintf("`other` is not of type IngestionPerformanceStats: %T", other))
}

s.DataSize += otherStats.DataSize
s.BufferFlushes += otherStats.BufferFlushes
s.FlushesDueToSize += otherStats.FlushesDueToSize
s.Batches += otherStats.Batches
s.BatchesDueToRange += otherStats.BatchesDueToRange
s.BatchesDueToSize += otherStats.BatchesDueToSize
s.SplitRetries += otherStats.SplitRetries
s.Splits += otherStats.Splits
s.Scatters += otherStats.Scatters
s.ScatterMoved += otherStats.ScatterMoved
s.FillWait += otherStats.FillWait
s.SortWait += otherStats.SortWait
s.FlushWait += otherStats.FlushWait
s.BatchWait += otherStats.BatchWait
s.SendWait += otherStats.SendWait
s.SplitWait += otherStats.SplitWait
s.ScatterWait += otherStats.ScatterWait
s.CommitWait += otherStats.CommitWait
s.Duration += otherStats.Duration

for k, v := range otherStats.SendWaitByStore {
s.SendWaitByStore[k] += v
}
}

// Tag implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Tag() string {
return "IngestionPerformanceStats"
}

// Render implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Render() []attribute.KeyValue {
const mb = 1 << 20
tags := make([]attribute.KeyValue, 0)
if s.Batches > 0 {
tags = append(tags,
attribute.KeyValue{
Key: "num_batches",
Value: attribute.Int64Value(s.Batches),
},
attribute.KeyValue{
Key: "num_batches_due_to_size",
Value: attribute.Int64Value(s.BatchesDueToSize),
},
attribute.KeyValue{
Key: "num_batches_due_to_range",
Value: attribute.Int64Value(s.BatchesDueToRange),
},
attribute.KeyValue{
Key: "split_retires",
Value: attribute.Int64Value(s.SplitRetries),
},
)
}

if s.BufferFlushes > 0 {
tags = append(tags,
attribute.KeyValue{
Key: "num_flushes",
Value: attribute.Int64Value(s.BufferFlushes),
},
attribute.KeyValue{
Key: "num_flushes_due_to_size",
Value: attribute.Int64Value(s.FlushesDueToSize),
},
)
}

if s.DataSize > 0 {
dataSizeMB := float64(s.DataSize) / mb
tags = append(tags, attribute.KeyValue{
Key: "data_size",
Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)),
})

if s.Duration > 0 {
throughput := dataSizeMB / s.Duration.Seconds()
tags = append(tags, attribute.KeyValue{
Key: "throughput",
Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)),
})
}
}

tags = append(tags,
timeKeyValue("fill_wait", s.FillWait),
timeKeyValue("sort_wait", s.SortWait),
timeKeyValue("flush_wait", s.FlushWait),
timeKeyValue("batch_wait", s.BatchWait),
timeKeyValue("send_wait", s.SendWait),
timeKeyValue("split_wait", s.SplitWait),
attribute.KeyValue{Key: "splits", Value: attribute.Int64Value(s.Splits)},
timeKeyValue("scatter_wait", s.ScatterWait),
attribute.KeyValue{Key: "scatters", Value: attribute.Int64Value(s.Scatters)},
attribute.KeyValue{Key: "scatter_moved", Value: attribute.Int64Value(s.ScatterMoved)},
timeKeyValue("commit_wait", s.CommitWait),
)

// Sort store send wait by IDs before adding them as tags.
ids := make(roachpb.StoreIDSlice, 0, len(s.SendWaitByStore))
for i := range s.SendWaitByStore {
ids = append(ids, i)
}
sort.Sort(ids)
for _, id := range ids {
tags = append(tags, timeKeyValue(attribute.Key(fmt.Sprintf("store-%d_send_wait", id)), s.SendWaitByStore[id]))
}

return tags
}

func timeKeyValue(key attribute.Key, time time.Duration) attribute.KeyValue {
return attribute.KeyValue{
Key: key,
Value: attribute.StringValue(string(humanizeutil.Duration(time))),
}
}

// LogTimings logs the timing ingestion stats.
func (s *IngestionPerformanceStats) LogTimings(ctx context.Context, name, action string) {
log.Infof(ctx,
"%s adder %s; ingested %s: %s filling; %v sorting; %v / %v flushing; %v sending; %v splitting; %d; %v scattering, %d, %v; %v commit-wait",
name,
redact.Safe(action),
sz(s.DataSize),
timing(s.FillWait),
timing(s.SortWait),
timing(s.FlushWait),
timing(s.BatchWait),
timing(s.SendWait),
timing(s.SplitWait),
s.Splits,
timing(s.ScatterWait),
s.Scatters,
s.ScatterMoved,
timing(s.CommitWait),
)
}

// LogFlushes logs stats about buffering added and SST batcher flushes.
func (s *IngestionPerformanceStats) LogFlushes(
ctx context.Context, name, action string, bufSize int64, span roachpb.Span,
) {
log.Infof(ctx,
"%s adder %s; flushed into %s %d times, %d due to buffer size (%s); flushing chunked into %d files (%d for ranges, %d for sst size) +%d split-retries",
name,
redact.Safe(action),
span,
s.BufferFlushes,
s.FlushesDueToSize,
sz(bufSize),
s.Batches,
s.BatchesDueToRange,
s.BatchesDueToSize,
s.SplitRetries,
)
}

// LogPerStoreTimings logs send waits per store.
func (s *IngestionPerformanceStats) LogPerStoreTimings(ctx context.Context, name string) {
if len(s.SendWaitByStore) == 0 {
return
}
ids := make(roachpb.StoreIDSlice, 0, len(s.SendWaitByStore))
for i := range s.SendWaitByStore {
ids = append(ids, i)
}
sort.Sort(ids)

var sb strings.Builder
for i, id := range ids {
// Hack: fill the map with placeholder stores if we haven't seen the store
// with ID below K for all but lowest K, so that next time we print a zero.
if i > 0 && ids[i-1] != id-1 {
s.SendWaitByStore[id-1] = 0
fmt.Fprintf(&sb, "%d: %s;", id-1, timing(0))
}
fmt.Fprintf(&sb, "%d: %s;", id, timing(s.SendWaitByStore[id]))

}
log.Infof(ctx, "%s waited on sending to: %s", name, redact.Safe(sb.String()))
}

type sz int64

func (b sz) String() string { return string(humanizeutil.IBytes(int64(b))) }
func (b sz) SafeValue() {}

type timing time.Duration

func (t timing) String() string { return time.Duration(t).Round(time.Second).String() }
func (t timing) SafeValue() {}

var _ bulk.TracingAggregatorEvent = &IngestionPerformanceStats{}

func init() {
protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest")
}
64 changes: 0 additions & 64 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,67 +211,3 @@ message ExportStats {
// ExportResponse and push the response on a channel.
int64 duration = 3 [(gogoproto.casttype) = "time.Duration"];
}

// IngestionPerformanceStats is a message containing information about the
// creation of SSTables by an SSTBatcher or BufferingAdder.
message IngestionPerformanceStats {
// DataSize is the total byte size of all the SST files ingested.
int64 data_size = 1;

// Buffer Flushes is the number of buffer flushes.
int64 buffer_flushes = 2;

// FlushesDueToSize is the number of buffer flushes due to buffer size.
int64 flushes_due_to_size = 3;

// Batches is the number of batches (addsstable calls) sent.
int64 batches = 4;

// BatchesDueToRange is the number of batches due to range bounds.
int64 batches_due_to_range = 5;

// BatchesDueToSize is the number of batches due to batch size.
int64 batches_due_to_size = 6;

// SplitRetries is the number of extra sub-batches created due to unexpected
// splits.
int64 split_retries = 7;

// Splits is the number of splits sent.
int64 splits = 8;

// Scatters is the number of scatters sent.0
int64 scatters = 9;

// ScatterMoved is the total size in bytes moved by scatter calls.
int64 scatter_moved = 10; // total size moved by scatter calls.

// FillWait is the time spent between buffer flushes.
int64 fill_wait = 11 [(gogoproto.casttype) = "time.Duration"];

// SortWait is the time spent sorting buffers.
int64 sort_wait = 12 [(gogoproto.casttype) = "time.Duration"];

// FlushWait is the time spent flushing buffers.
int64 flush_wait = 13 [(gogoproto.casttype) = "time.Duration"];

// BatchWait is the time spent flushing batches (inc split/scatter/send).
int64 batch_wait = 14 [(gogoproto.casttype) = "time.Duration"];

// SendWait is the time spent sending batches (addsstable+retries)
int64 send_wait = 15 [(gogoproto.casttype) = "time.Duration"];

// SplitWait is the time spent splitting.
int64 split_wait = 16 [(gogoproto.casttype) = "time.Duration"];
// ScatterWait is the time spent scattering.
int64 scatter_wait = 17 [(gogoproto.casttype) = "time.Duration"];

// CommitWait is the time spent waiting for commit timestamps.
int64 commit_wait = 18 [(gogoproto.casttype) = "time.Duration"];

// Duration is the total ingestion time.
int64 duration = 19 [(gogoproto.casttype) = "time.Duration"];

// SendWaitByStore is the time spent sending batches to each store.
map<int32, int64> send_wait_by_store = 20 [(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID", (gogoproto.castvalue) = "time.Duration"];
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ go_library(
"@com_github_kr_pretty//:pretty",
"@com_github_lib_pq//:pq",
"@com_github_montanaflynn_stats//:stats",
"@com_github_pmezard_go_difflib//difflib",
"@com_github_prometheus_client_golang//api",
"@com_github_prometheus_client_golang//api/prometheus/v1:prometheus",
"@com_github_prometheus_common//model",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/pmezard/go-difflib/difflib"
)

func registerValidateSystemSchemaAfterVersionUpgrade(r registry.Registry) {
Expand Down Expand Up @@ -85,8 +86,18 @@ func registerValidateSystemSchemaAfterVersionUpgrade(r registry.Registry) {
validateEquivalenceStep := func(str1, str2 *string) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
if *str1 != *str2 {
t.Fatal("After upgrading, `USE system; SHOW CREATE ALL TABLES;` " +
"does not match expected output after version upgrade.\n")
diff, diffErr := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{
A: difflib.SplitLines(*str1),
B: difflib.SplitLines(*str2),
Context: 5,
})
if diffErr != nil {
diff = diffErr.Error()
t.Errorf("failed to produce diff: %v", diffErr)
}
t.Fatalf("After upgrading, `USE system; SHOW CREATE ALL TABLES;` "+
"does not match expected output after version upgrade."+
"\nDiff:\n%s", diff)
}
t.L().Printf("validating succeeded:\n%v", *str1)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PROTOBUF_SRCS = [
"//pkg/geo/geopb:geopb_go_proto",
"//pkg/gossip:gossip_go_proto",
"//pkg/jobs/jobspb:jobspb_go_proto",
"//pkg/kv/bulk/bulkpb:bulkpb_go_proto",
"//pkg/kv/kvnemesis:kvnemesis_go_proto",
"//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto",
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/backupccl/backuppb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk/bulkpb",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
Expand Down
Loading

0 comments on commit 7e0ecb6

Please sign in to comment.