Skip to content

Commit

Permalink
rangedesciter: carve out library for range desc iteration
Browse files Browse the repository at this point in the history
Informs #87503; pure code-movement. Going to use it in future commits as
part of multi-tenant replication reports (#89987) where we'll need to
iterate over the set of range descriptors.

Release note: None
  • Loading branch information
irfansharif committed Oct 14, 2022
1 parent 708b242 commit 75688c2
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 95 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ ALL_TESTS = [
"//pkg/util/quantile:quantile_test",
"//pkg/util/quotapool:quotapool_test",
"//pkg/util/randutil:randutil_test",
"//pkg/util/rangedesciter:rangedesciter_test",
"//pkg/util/retry:retry_test",
"//pkg/util/ring:ring_test",
"//pkg/util/schedulerlatency:schedulerlatency_test",
Expand Down Expand Up @@ -2020,6 +2021,8 @@ GO_TARGETS = [
"//pkg/util/quotapool:quotapool_test",
"//pkg/util/randutil:randutil",
"//pkg/util/randutil:randutil_test",
"//pkg/util/rangedesciter:rangedesciter",
"//pkg/util/rangedesciter:rangedesciter_test",
"//pkg/util/retry:retry",
"//pkg/util/retry:retry_test",
"//pkg/util/ring:ring",
Expand Down Expand Up @@ -2994,6 +2997,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/quantile:get_x_data",
"//pkg/util/quotapool:get_x_data",
"//pkg/util/randutil:get_x_data",
"//pkg/util/rangedesciter:get_x_data",
"//pkg/util/retry:get_x_data",
"//pkg/util/ring:get_x_data",
"//pkg/util/schedulerlatency:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ go_library(
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/rangedesciter",
"//pkg/util/retry",
"//pkg/util/schedulerlatency",
"//pkg/util/stop",
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/rangedesciter"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1004,9 +1005,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
var systemDeps upgrade.SystemDeps
if codec.ForSystemTenant() {
c = upgradecluster.New(upgradecluster.ClusterConfig{
NodeLiveness: nodeLiveness,
Dialer: cfg.nodeDialer,
DB: cfg.db,
NodeLiveness: nodeLiveness,
Dialer: cfg.nodeDialer,
RangeDescIterator: rangedesciter.New(cfg.db),
DB: cfg.db,
})
systemDeps = upgrade.SystemDeps{
Cluster: c,
Expand Down
24 changes: 0 additions & 24 deletions pkg/upgrade/system_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,6 @@ type Cluster interface {
// just be the `Migrate` request, with code added within [1] to do the
// specific things intended for the specified version.
//
// It's important to note that the closure is being executed in the context of
// a distributed transaction that may be automatically retried. So something
// like the following is an anti-pattern:
//
// processed := 0
// _ = h.IterateRangeDescriptors(...,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors) // we'll over count if retried
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// Instead we allow callers to pass in a callback to signal on every attempt
// (including the first). This lets us salvage the example above:
//
// var processed int
// init := func() { processed = 0 }
// _ = h.IterateRangeDescriptors(..., init,
// func(descriptors ...roachpb.RangeDescriptor) error {
// processed += len(descriptors)
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
// [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go
IterateRangeDescriptors(
ctx context.Context,
Expand Down
6 changes: 1 addition & 5 deletions pkg/upgrade/upgradecluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgradecluster",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
Expand All @@ -20,6 +19,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/quotapool",
"//pkg/util/rangedesciter",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@org_golang_google_grpc//:go_default_library",
Expand All @@ -30,23 +30,19 @@ go_test(
name = "upgradecluster_test",
size = "small",
srcs = [
"client_test.go",
"helper_test.go",
"main_test.go",
"nodes_test.go",
],
args = ["-test.timeout=55s"],
embed = [":upgradecluster"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
Expand Down
53 changes: 5 additions & 48 deletions pkg/upgrade/upgradecluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ package upgradecluster
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -23,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/cockroach/pkg/util/rangedesciter"
"github.com/cockroachdb/redact"
"google.golang.org/grpc"
)
Expand All @@ -42,6 +41,9 @@ type ClusterConfig struct {
// Dialer constructs connections to other nodes.
Dialer NodeDialer

// RangeDescIterator iterates through all range descriptors.
RangeDescIterator rangedesciter.Iterator

// DB provides access the kv.DB instance backing the cluster.
//
// TODO(irfansharif): We could hide the kv.DB instance behind an interface
Expand Down Expand Up @@ -143,50 +145,5 @@ func (c *Cluster) ForEveryNode(
func (c *Cluster) IterateRangeDescriptors(
ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error,
) error {
return c.c.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Inform the caller that we're starting a fresh attempt to page in
// range descriptors.
init()

// Iterate through meta{1,2} to pull out all the range descriptors.
var lastRangeIDInMeta1 roachpb.RangeID
return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, blockSize,
func(rows []kv.KeyValue) error {
descriptors := make([]roachpb.RangeDescriptor, 0, len(rows))
var desc roachpb.RangeDescriptor
for _, row := range rows {
err := row.ValueProto(&desc)
if err != nil {
return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key)
}

// In small enough clusters it's possible for the same range
// descriptor to be stored in both meta1 and meta2. This
// happens when some range spans both the meta and the user
// keyspace. Consider when r1 is [/Min,
// /System/NodeLiveness); we'll store the range descriptor
// in both /Meta2/<r1.EndKey> and in /Meta1/KeyMax[1].
//
// As part of iterator we'll de-duplicate this descriptor
// away by checking whether we've seen it before in meta1.
// Since we're scanning over the meta range in sorted
// order, it's enough to check against the last range
// descriptor we've seen in meta1.
//
// [1]: See kvserver.rangeAddressing.
if desc.RangeID == lastRangeIDInMeta1 {
continue
}

descriptors = append(descriptors, desc)
if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) {
lastRangeIDInMeta1 = desc.RangeID
}
}

// Invoke fn with the current chunk (of size ~blockSize) of
// range descriptors.
return fn(descriptors...)
})
})
return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, fn)
}
39 changes: 39 additions & 0 deletions pkg/util/rangedesciter/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "rangedesciter",
srcs = ["rangedesciter.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/rangedesciter",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "rangedesciter_test",
srcs = [
"main_test.go",
"rangedesciter_test.go",
],
args = ["-test.timeout=295s"],
deps = [
":rangedesciter",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/tests",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
],
)

get_x_data(name = "get_x_data")
31 changes: 31 additions & 0 deletions pkg/util/rangedesciter/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangedesciter_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
securityassets.SetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
Loading

0 comments on commit 75688c2

Please sign in to comment.