Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129758: parser,mt: remove CREATE TENANT ... LIKE r=dt a=dt

The problem -- needing to apply a large number of configuration knobs -- solved by the 'template tenant' functionality has since been solved instead by having named service modes for each distinct common set of configurations, for example 'shared' which implies all capabilities in addition to in-process service. The template functionality was never pushed on PCR UA users, and is not used in cloud, and thus has no users and is accordingly removed here.

Release note: none.
Epic: none.

130054: rac2: add AdmittedTracker interface for use by RangeController r=pav-kv,kvoli a=sumeerbhola

The AdmittedTracker provides the latest AdmittedVector, and will be implemented by processorImpl.

Informs #129508

Epic: CRDB-37515

Release note: None

130096: release: update_versions: continue, even if one repo attempt fails r=celiala a=celiala

Follow-up to #130063: after merging 130063, update_versions failed to complete[1], due to encountering another / different error.

This commit adjusts update_versions so that it continues & attempts to create PRs for all the remaining repos, despite encountering errors along the way.

[1] Latest "Create version update PRs" job: https://teamcity.cockroachdb.com/buildConfiguration/Internal_Cockroach_Release_Publish_CreateVersionUpdatePRs/16752534?buildTab=log&focusLine=3552&logView=flowAware&expandAll=true

Release note: None
Epic: None
Release justification: release-infra only change.

130111: execinfrapb: move MockDistSQLServer to flowinfra r=dt a=dt

pb packages are intended to be leaf packages with few or no dependencies. The mock server in the test utils was pulling in dependencies on packages that encode business logic, including logic that wants to depend on pb packages such as pkg/rpc's auth logic that depends on request types.

Release note: none.
Epic: none.

Co-authored-by: David Taylor <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Celia La <[email protected]>
  • Loading branch information
4 people committed Sep 4, 2024
5 parents 268e3b3 + 1624be3 + 4cab332 + 58f0a44 + 7473273 commit 77606cb
Show file tree
Hide file tree
Showing 23 changed files with 131 additions and 471 deletions.
25 changes: 3 additions & 22 deletions pkg/ccl/crosscluster/physical/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func streamIngestionJobDescription(
ReplicationSourceTenantName: streamIngestion.ReplicationSourceTenantName,
ReplicationSourceAddress: tree.NewDString(redactedSourceAddr),
Options: streamIngestion.Options,
Like: streamIngestion.Like,
}
ann := p.ExtendedEvalContext().Annotations
return tree.AsStringWithFQNames(redactedCreateStmt, ann), nil
Expand All @@ -74,11 +73,6 @@ func ingestionTypeCheck(
ingestionStmt.ReplicationSourceAddress,
ingestionStmt.Options.Retention},
}
if ingestionStmt.Like.OtherTenant != nil {
toTypeCheck = append(toTypeCheck,
exprutil.TenantSpec{TenantSpec: ingestionStmt.Like.OtherTenant},
)
}

if err := exprutil.TypeCheck(ctx, "INGESTION", p.SemaCtx(), toTypeCheck...); err != nil {
return false, nil, err
Expand Down Expand Up @@ -117,15 +111,6 @@ func ingestionPlanHook(
return nil, nil, nil, false, err
}

var likeTenantID uint64
var likeTenantName string
if ingestionStmt.Like.OtherTenant != nil {
_, likeTenantID, likeTenantName, err = exprEval.TenantSpec(ctx, ingestionStmt.Like.OtherTenant)
if err != nil {
return nil, nil, nil, false, err
}
}

evalCtx := &p.ExtendedEvalContext().Context
options, err := evalTenantReplicationOptions(ctx, ingestionStmt.Options, exprEval, evalCtx, p.SemaCtx(), createReplicationOp)
if err != nil {
Expand Down Expand Up @@ -175,12 +160,8 @@ func ingestionPlanHook(
// If we don't have a resume timestamp, make a new tenant
jobID := p.ExecCfg().JobRegistry.MakeJobID()
var destinationTenantID roachpb.TenantID
// Determine which template will be used as config template to
// create the new tenant below.
tenantInfo, err := sql.GetTenantTemplate(ctx, p.ExecCfg().Settings, p.InternalSQLTxn(), nil, likeTenantID, likeTenantName)
if err != nil {
return err
}

var tenantInfo mtinfopb.TenantInfoWithUsage

// Create a new tenant for the replication stream.
tenantInfo.PhysicalReplicationConsumerJobID = jobID
Expand All @@ -197,7 +178,7 @@ func ingestionPlanHook(
ctx, p.ExecCfg().Codec, p.ExecCfg().Settings,
p.InternalSQLTxn(),
p.ExecCfg().SpanConfigKVAccessor.WithISQLTxn(ctx, p.InternalSQLTxn()),
tenantInfo, initialTenantZoneConfig,
&tenantInfo, initialTenantZoneConfig,
ingestionStmt.IfNotExists,
p.ExecCfg().TenantTestingKnobs,
)
Expand Down
24 changes: 18 additions & 6 deletions pkg/cmd/release/update_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ func updateVersions(_ *cobra.Command, _ []string) error {
for _, repo := range reposToWorkOn {
repo.workOnRepoError = workOnRepo(repo)
if repo.workOnRepoError != nil {
log.Printf("workOnRepo: error occurred while working on %s: %s", repo.name(), repo.workOnRepoError)
workOnRepoErrors = append(workOnRepoErrors, repo.workOnRepoError)
err = fmt.Errorf("workOnRepo: error occurred while working on repo %s: %w", repo.name(), err)
workOnRepoErrors = append(workOnRepoErrors, err)
log.Printf("%s", err)
}
}

Expand All @@ -305,27 +306,38 @@ func updateVersions(_ *cobra.Command, _ []string) error {
// run multiple times.
prDesc, err := repo.prExists()
if err != nil {
return fmt.Errorf("checking pr: %w", err)
err = fmt.Errorf("error while checking if pull request exists for repo %s: %w", repo.name(), err)
workOnRepoErrors = append(workOnRepoErrors, err)
log.Printf("%s", err)
continue
}
if prDesc != "" {
log.Printf("pull request for %s already exists: %s", repo.name(), prDesc)
continue
}
log.Printf("pushing changes to repo %s in %s", repo.name(), dest)
if err := repo.push(); err != nil {
return fmt.Errorf("cannot push changes for %s: %w", repo.name(), err)
err = fmt.Errorf("error while pushing changes to repo %s: %w", repo.name(), err)
workOnRepoErrors = append(workOnRepoErrors, err)
log.Printf("%s", err)
continue
}
log.Printf("creating pull request for %s in %s", repo.name(), dest)
pr, err := repo.createPullRequest()
if err != nil {
return fmt.Errorf("cannot create pull request for %s: %w", repo.name(), err)
err = fmt.Errorf("error creating pull request for %s: %w", repo.name(), err)
workOnRepoErrors = append(workOnRepoErrors, err)
log.Printf("%s", err)
continue
}
log.Printf("Created PR: %s\n", pr)
prs = append(prs, pr)
}

if err := sendPrReport(releasedVersion, prs, smtpPassword); err != nil {
return fmt.Errorf("cannot send email: %w", err)
err = fmt.Errorf("error sending email: %w", err)
workOnRepoErrors = append(workOnRepoErrors, err)
log.Printf("%s", err)
}
if len(workOnRepoErrors) > 0 {
return errors.Join(workOnRepoErrors...)
Expand Down
43 changes: 21 additions & 22 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,17 @@ type RangeController interface {
// TODO(pav-kv): This interface a placeholder for the interface containing raft
// methods. Replace this as part of #128019.
type RaftInterface interface {
// FollowerState returns the current state of a follower. The value of
// Match, Next, Admitted are populated iff in StateReplicate. All entries >=
// FollowerStateRaftMuLocked returns the current state of a follower. The
// value of Match, Next are populated iff in StateReplicate. All entries >=
// Next have not had MsgApps constructed during the lifetime of this
// StateReplicate (they may have been constructed previously).
//
// When a follower transitions from {StateProbe,StateSnapshot} =>
// StateReplicate, we start trying to send MsgApps. We should
// notice such transitions both in HandleRaftEvent and SetReplicasLocked.
//
// RACv1 also cared about three other cases where the follower behaved as if
// it were disconnected (a) paused follower, (b) follower is behind, (c)
// follower is inactive (see
// replicaFlowControlIntegrationImpl.notActivelyReplicatingTo). (b) and (c)
// were needed since it paced at rate of slowest replica, while for regular
// work we will in v2 pace at slowest in quorum (and we don't care about
// elastic experiencing a hiccup, given it paces at rate of slowest). For
// (a), we plan to remove follower pausing. So the v2 code will be
// simplified.
//
// Requires Replica.raftMu to be held, Replica.mu is not held.
FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo
FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo
}

type FollowerStateInfo struct {
Expand All @@ -108,10 +98,17 @@ type FollowerStateInfo struct {
// (Match, Next) is in-flight.
Match uint64
Next uint64
// TODO(kvoli): Find a better home for this, we need it for token return.
Term uint64
// Invariant: Admitted[i] <= Match.
Admitted [raftpb.NumPriorities]uint64
}

// AdmittedTracker is used to retrieve the latest admitted vector for a
// replica (including the leader).
type AdmittedTracker interface {
// GetAdmitted returns the latest AdmittedVector for replicaID. It returns
// an empty struct if the replicaID is not known. NB: the
// AdmittedVector.Admitted[i] value can transiently advance past
// FollowerStateInfo.Match, since the admitted tracking subsystem is
// separate from Raft.
GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector
}

// RaftEvent carries a RACv2-relevant subset of raft state sent to storage.
Expand Down Expand Up @@ -203,6 +200,7 @@ type RangeControllerOptions struct {
RaftInterface RaftInterface
Clock *hlc.Clock
CloseTimerScheduler ProbeToCloseTimerScheduler
AdmittedTracker AdmittedTracker
EvalWaitMetrics *EvalWaitMetrics
}

Expand Down Expand Up @@ -348,7 +346,7 @@ retry:
func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error {
shouldWaitChange := false
for r, rs := range rc.replicaMap {
info := rc.opts.RaftInterface.FollowerState(r)
info := rc.opts.RaftInterface.FollowerStateRaftMuLocked(r)
shouldWaitChange = rs.handleReadyState(ctx, info) || shouldWaitChange
}
// If there was a quorum change, update the voter sets, triggering the
Expand Down Expand Up @@ -504,7 +502,7 @@ func NewReplicaState(
sendTokenCounter: parent.opts.SSTokenCounter.Send(stream),
desc: desc,
}
state := parent.opts.RaftInterface.FollowerState(desc.ReplicaID)
state := parent.opts.RaftInterface.FollowerStateRaftMuLocked(desc.ReplicaID)
if state.State == tracker.StateReplicate {
rs.createReplicaSendStream()
}
Expand Down Expand Up @@ -655,7 +653,7 @@ func (rs *replicaState) handleReadyState(
rs.createReplicaSendStream()
shouldWaitChange = true
} else {
shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx, info)
shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx)
}

case tracker.StateSnapshot:
Expand Down Expand Up @@ -692,11 +690,12 @@ func (rss *replicaState) closeSendStream(ctx context.Context) {
}

func (rss *replicaSendStream) makeConsistentInStateReplicate(
ctx context.Context, info FollowerStateInfo,
ctx context.Context,
) (shouldWaitChange bool) {
av := rss.parent.parent.opts.AdmittedTracker.GetAdmitted(rss.parent.desc.ReplicaID)
rss.mu.Lock()
defer rss.mu.Unlock()
defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(info.Term, info.Admitted))
defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted))

// The leader is always in state replicate.
if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID {
Expand Down
25 changes: 22 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange {
RaftInterface: testRC,
Clock: s.clock,
CloseTimerScheduler: s.probeToCloseScheduler,
AdmittedTracker: testRC,
EvalWaitMetrics: s.evalMetrics,
}

Expand Down Expand Up @@ -272,7 +273,7 @@ type testingRCRange struct {
}
}

func (r *testingRCRange) FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo {
func (r *testingRCRange) FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -283,6 +284,17 @@ func (r *testingRCRange) FollowerState(replicaID roachpb.ReplicaID) FollowerStat
return replica.info
}

func (r *testingRCRange) GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector {
r.mu.Lock()
defer r.mu.Unlock()

replica, ok := r.mu.r.replicaSet[replicaID]
if !ok {
return AdmittedVector{}
}
return replica.av
}

func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -320,8 +332,8 @@ func (r *testingRCRange) admit(
for _, replica := range r.mu.r.replicaSet {
if replica.desc.StoreID == storeID {
replica := replica
replica.info.Admitted[AdmissionToRaftPriority(pri)] = toIndex
replica.info.Term = term
replica.av.Admitted[AdmissionToRaftPriority(pri)] = toIndex
replica.av.Term = term
r.mu.r.replicaSet[replica.desc.ReplicaID] = replica
break
}
Expand Down Expand Up @@ -352,6 +364,7 @@ const invalidTrackerState = tracker.StateSnapshot + 1
type testingReplica struct {
desc roachpb.ReplicaDescriptor
info FollowerStateInfo
av AdmittedVector
}

func scanRanges(t *testing.T, input string) []testingRange {
Expand Down Expand Up @@ -796,6 +809,12 @@ func TestRangeController(t *testing.T) {
term, err = strconv.Atoi(parts[1])
require.NoError(t, err)

// TODO(sumeer): the test input only specifies an
// incremental change to the admitted vector, for a
// single priority. However, in practice, the whole
// vector will be updated, which also cleanly handles
// the case of an advancing term. Consider changing
// this to accept a non-incremental update.
parts[2] = strings.TrimSpace(parts[2])
require.True(t, strings.HasPrefix(parts[2], "to_index="))
parts[2] = strings.TrimPrefix(strings.TrimSpace(parts[2]), "to_index=")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type tracked struct {

func (dt *Tracker) Init(stream kvflowcontrol.Stream) {
*dt = Tracker{
tracked: [int(raftpb.NumPriorities)][]tracked{},
tracked: [raftpb.NumPriorities][]tracked{},
stream: stream,
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ type processorImpl struct {

var _ Processor = &processorImpl{}

var _ rac2.AdmittedTracker = &processorImpl{}

func NewProcessor(opts ProcessorOptions) Processor {
p := &processorImpl{opts: opts}
p.mu.enabledWhenLeader = opts.EnabledWhenLeaderLevel
Expand Down Expand Up @@ -970,6 +972,12 @@ func admittedIncreased(prev, next [raftpb.NumPriorities]uint64) bool {
return false
}

// GetAdmitted implements rac2.AdmittedTracker.
func (p *processorImpl) GetAdmitted(replicaID roachpb.ReplicaID) rac2.AdmittedVector {
// TODO(pav-kv): implement
return rac2.AdmittedVector{}
}

// RangeControllerFactoryImpl implements RangeControllerFactory.
//
// TODO(sumeer): replace with real implementation once RangeController impl is
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ var retiredSettings = map[InternalKey]struct{}{

// removed as of 24.3
"bulkio.backup.split_keys_on_timestamps": {},
"sql.create_tenant.default_template": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/util/cancelchecker",
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestOutboxInbox(t *testing.T) {
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
require.NoError(t, err)

// Generate a random cancellation scenario.
Expand Down Expand Up @@ -490,7 +491,7 @@ func TestInboxHostCtxCancellation(t *testing.T) {
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx, clock, stopper, execinfra.StaticSQLInstanceID)
require.NoError(t, err)

rng, _ := randutil.NewTestRand()
Expand Down Expand Up @@ -578,7 +579,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx,
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx,
hlc.NewClockForTesting(nil), stopper, execinfra.StaticSQLInstanceID,
)
require.NoError(t, err)
Expand Down Expand Up @@ -773,7 +774,7 @@ func BenchmarkOutboxInbox(b *testing.B) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx,
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx,
hlc.NewClockForTesting(nil), stopper, execinfra.StaticSQLInstanceID,
)
require.NoError(b, err)
Expand Down Expand Up @@ -848,11 +849,11 @@ func TestOutboxStreamIDPropagation(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

_, mockServer, addr, err := execinfrapb.StartMockDistSQLServer(ctx,
_, mockServer, addr, err := flowinfra.StartMockDistSQLServer(ctx,
hlc.NewClockForTesting(nil), stopper, execinfra.StaticSQLInstanceID,
)
require.NoError(t, err)
dialer := &execinfrapb.MockDialer{Addr: addr}
dialer := &flowinfra.MockDialer{Addr: addr}
defer dialer.Close()

typs := []*types.T{types.Int}
Expand Down
Loading

0 comments on commit 77606cb

Please sign in to comment.