diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 108d1f90ec2a..5ad450fd5e7c 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -339,12 +339,10 @@ func (v *validator) processOp(txnID *string, op Operation) { v.failIfError(op, t.Result) } case *TransferLeaseOperation: - if resultIsErrorStr(t.Result, `cannot transfer lease to replica of type (VOTER_INCOMING|VOTER_OUTGOING|VOTER_DEMOTING|LEARNER|NON_VOTER)`) { + if resultIsErrorStr(t.Result, `replica cannot hold lease`) { // Only VOTER_FULL replicas can currently hold a range lease. // Attempts to transfer to lease to any other replica type are // rejected. - } else if resultIsErrorStr(t.Result, `replica cannot hold lease`) { - // Same case as above, but with a slightly different message. } else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) { // A lease transfer that races with a replica removal may find that // the store it was targeting is no longer part of the range. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_test.go b/pkg/kv/kvserver/batcheval/cmd_lease_test.go index 1e4bfdc7eaaf..d5cd5efa023e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_test.go @@ -190,6 +190,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + prevLease := roachpb.Lease{ + Replica: replicas[0], + Sequence: 1, + } nextLease := roachpb.Lease{ Replica: replicas[1], Start: clock.NowAsClockTimestamp(), @@ -209,16 +213,19 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { } currentReadSummary := rspb.FromTimestamp(maxPriorReadTS) + evalCtx := &MockEvalCtx{ + ClusterSettings: cluster.MakeTestingClusterSettings(), + StoreID: 1, + Desc: &desc, + Clock: clock, + Lease: prevLease, + CurrentReadSummary: currentReadSummary, + } cArgs := CommandArgs{ - EvalCtx: (&MockEvalCtx{ - ClusterSettings: cluster.MakeTestingClusterSettings(), - StoreID: 1, - Desc: &desc, - Clock: clock, - CurrentReadSummary: currentReadSummary, - }).EvalContext(), + EvalCtx: evalCtx.EvalContext(), Args: &roachpb.TransferLeaseRequest{ - Lease: nextLease, + Lease: nextLease, + PrevLease: prevLease, }, } @@ -233,6 +240,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { require.NotNil(t, propLease) require.True(t, nextLease.Start.Less(propLease.Start)) require.True(t, beforeEval.Less(propLease.Start)) + require.Equal(t, prevLease.Sequence+1, propLease.Sequence) + + // The previous lease should have been revoked. + require.Equal(t, prevLease.Sequence, evalCtx.RevokedLeaseSeq) // The prior read summary should reflect the maximum read times // served under the current leaseholder. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index badf400f007d..bb59285f73c0 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -77,11 +77,38 @@ func TransferLease( newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp()) args.Lease = roachpb.Lease{} // prevent accidental use below + // If this check is removed at some point, the filtering of learners on the + // sending side would have to be removed as well. + if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { + return newFailedLeaseTrigger(true /* isTransfer */), err + } + + // Stop using the current lease. All future calls to leaseStatus on this + // node with the current lease will now return a PROSCRIBED status. This + // includes calls to leaseStatus from the closed timestamp side-transport, + // meaning that the following call to GetCurrentReadSummary is guaranteed to + // observe the highest closed timestamp published under this lease. + // + // We perform this action during evaluation to ensure that the lease + // revocation takes place regardless of whether the corresponding Raft + // proposal succeeds, fails, or is ambiguous - in which case there's no + // guarantee that the transfer will not still apply. This means that if the + // proposal fails, we'll have relinquished the current lease but not managed + // to give the lease to someone else, so we'll have to re-acquire the lease + // again through a RequestLease request to recover. This situation is tested + // in TestBehaviorDuringLeaseTransfer/transferSucceeds=false. + // + // NOTE: RevokeLease will be a no-op if the lease has already changed. In + // such cases, we could detect that here and fail fast, but it's safe and + // easier to just let the TransferLease be proposed under the wrong lease + // and be rejected with the correct error below Raft. + cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence) + // Collect a read summary from the outgoing leaseholder to ship to the // incoming leaseholder. This is used to instruct the new leaseholder on how // to update its timestamp cache to ensure that no future writes are allowed // to invalidate prior reads. - priorReadSum := cArgs.EvalCtx.GetCurrentReadSummary() + priorReadSum, _ := cArgs.EvalCtx.GetCurrentReadSummary() // For now, forward this summary to the proposed lease's start time. This // may appear to undermine the benefit of the read summary, but it doesn't // entirely. Until we ship higher-resolution read summaries, the read @@ -94,12 +121,6 @@ func TransferLease( // think about. priorReadSum.Merge(rspb.FromTimestamp(newLease.Start.ToTimestamp())) - // If this check is removed at some point, the filtering of learners on the - // sending side would have to be removed as well. - if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { - return newFailedLeaseTrigger(true /* isTransfer */), err - } - log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease) return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats, newLease, prevLease, &priorReadSum, false /* isExtension */, true /* isTransfer */) diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index 61610efa99ce..18cb3c5a5dd5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -110,6 +110,21 @@ func Subsume( return result.Result{}, errors.AssertionFailedf("non-deletion intent on local range descriptor") } + // NOTE: the deletion intent on the range's meta2 descriptor is just as + // important to correctness as the deletion intent on the local descriptor, + // but the check is too expensive as it would involve a network roundtrip on + // most nodes. + + // Freeze the range. Do so by blocking all requests while a newly launched + // async goroutine watches (pushes with low priority) the merge transaction. + // This will also block the closed timestamp side-transport from closing new + // timestamps, meaning that the following call to GetCurrentReadSummary is + // guaranteed to observe the highest closed timestamp ever published by this + // range (if the merge eventually completes). + if err := cArgs.EvalCtx.WatchForMerge(ctx); err != nil { + return result.Result{}, errors.Wrap(err, "watching for merge during subsume") + } + // We prevent followers of the RHS from being able to serve follower reads on // timestamps that fall in the timestamp window representing the range's // subsumed state (i.e. between the subsumption time (FreezeStart) and the @@ -142,33 +157,25 @@ func Subsume( // is subsumed, we ensure that the initial MLAI update broadcast by the new // leaseholder respects the invariant in question, in much the same way we do // here. Take a look at `EmitMLAI()` in replica_closedts.go for more details. + // + // TODO(nvanbenschoten): remove this in v21.2 when the rest of the v1 closed + // timestamp system disappears. _, untrack := cArgs.EvalCtx.GetTracker().Track(ctx) lease, _ := cArgs.EvalCtx.GetLease() lai := cArgs.EvalCtx.GetLeaseAppliedIndex() untrack(ctx, ctpb.Epoch(lease.Epoch), desc.RangeID, ctpb.LAI(lai+1)) - // NOTE: the deletion intent on the range's meta2 descriptor is just as - // important to correctness as the deletion intent on the local descriptor, - // but the check is too expensive as it would involve a network roundtrip on - // most nodes. - + // Now that the range is frozen, collect some information to ship to the LHS + // leaseholder through the merge trigger. reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() reply.LeaseAppliedIndex = lai reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp() - // FrozenClosedTimestamp might return an empty timestamp if the Raft-based - // closed timestamp transport hasn't been enabled yet. That's OK because, if - // the new transport is not enabled, then ranges with leading closed - // timestamps can't exist yet, and so the closed timestamp must be below the - // FreezeStart. The FreezeStart is used by Store.MergeRange to bump the RHS' - // ts cache if LHS/RHS leases are not collocated. The case when the leases are - // collocated also works out because then the closed timestamp (according to - // the old mechanism) is the same for both ranges being merged. - reply.ClosedTimestamp = cArgs.EvalCtx.GetFrozenClosedTimestamp() + // Collect a read summary from the RHS leaseholder to ship to the LHS // leaseholder. This is used to instruct the LHS on how to update its // timestamp cache to ensure that no future writes are allowed to invalidate // prior reads performed to this point on the RHS range. - priorReadSum := cArgs.EvalCtx.GetCurrentReadSummary() + priorReadSum, closedTS := cArgs.EvalCtx.GetCurrentReadSummary() // For now, forward this summary to the freeze time. This may appear to // undermine the benefit of the read summary, but it doesn't entirely. Until // we ship higher-resolution read summaries, the read summary doesn't @@ -181,8 +188,16 @@ func Subsume( // think about. priorReadSum.Merge(rspb.FromTimestamp(reply.FreezeStart.ToTimestamp())) reply.ReadSummary = &priorReadSum + // NOTE FOR v21.1: GetCurrentReadSummary might return an empty timestamp if + // the Raft-based closed timestamp transport hasn't been enabled yet. That's + // OK because, if the new transport is not enabled, then ranges with leading + // closed timestamps can't exist yet, and so the closed timestamp must be + // below the FreezeStart. The FreezeStart is used by Store.MergeRange to + // bump the RHS' ts cache if LHS/RHS leases are not collocated. The case + // when the leases are collocated also works out because then the closed + // timestamp (according to the old mechanism) is the same for both ranges + // being merged. + reply.ClosedTimestamp = closedTS - return result.Result{ - Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()}, - }, nil + return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 747e805b885d..771a25cf9572 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -97,18 +97,33 @@ type EvalContext interface { GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error) GetLease() (roachpb.Lease, roachpb.Lease) GetRangeInfo(context.Context) roachpb.RangeInfo - GetFrozenClosedTimestamp() hlc.Timestamp // GetCurrentReadSummary returns a new ReadSummary reflecting all reads // served by the range to this point. The method requires a write latch // across all keys in the range (see declareAllKeys), because it will only // return a meaningful summary if the caller has serialized with all other // requests on the range. - GetCurrentReadSummary() rspb.ReadSummary + // + // The method also returns the current closed timestamp on the range. This + // closed timestamp is already incorporated into the read summary, but some + // callers also need is separated out. It is expected that a caller will + // have performed some action (either calling RevokeLease or WatchForMerge) + // to freeze further progression of the closed timestamp before calling this + // method. + GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, error) + + // RevokeLease stops the replica from using its current lease, if that lease + // matches the provided lease sequence. All future calls to leaseStatus on + // this node with the current lease will now return a PROSCRIBED status. + RevokeLease(context.Context, roachpb.LeaseSequence) + + // WatchForMerge arranges to block all requests until the in-progress merge + // completes. Returns an error if no in-progress merge is detected. + WatchForMerge(ctx context.Context) error } // MockEvalCtx is a dummy implementation of EvalContext for testing purposes. @@ -126,6 +141,7 @@ type MockEvalCtx struct { CanCreateTxn func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) Lease roachpb.Lease CurrentReadSummary rspb.ReadSummary + RevokedLeaseSeq roachpb.LeaseSequence } // EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future @@ -194,9 +210,6 @@ func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 { func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI { panic("unimplemented") } -func (m *mockEvalCtxImpl) GetFrozenClosedTimestamp() hlc.Timestamp { - panic("unimplemented") -} func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor { return m.MockEvalCtx.Desc } @@ -226,8 +239,8 @@ func (m *mockEvalCtxImpl) GetLease() (roachpb.Lease, roachpb.Lease) { func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo { return roachpb.RangeInfo{Desc: *m.Desc(), Lease: m.Lease} } -func (m *mockEvalCtxImpl) GetCurrentReadSummary() rspb.ReadSummary { - return m.CurrentReadSummary +func (m *mockEvalCtxImpl) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { + return m.CurrentReadSummary, hlc.Timestamp{} } func (m *mockEvalCtxImpl) GetExternalStorage( ctx context.Context, dest roachpb.ExternalStorage, @@ -239,3 +252,9 @@ func (m *mockEvalCtxImpl) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { panic("unimplemented") } +func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequence) { + m.RevokedLeaseSeq = seq +} +func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error { + panic("unimplemented") +} diff --git a/pkg/kv/kvserver/batcheval/result/BUILD.bazel b/pkg/kv/kvserver/batcheval/result/BUILD.bazel index 83fd2b90b2eb..57423b77f80b 100644 --- a/pkg/kv/kvserver/batcheval/result/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/result/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", - "//pkg/util/hlc", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", "@com_github_kr_pretty//:pretty", diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 7d444371cf86..3ac37d887c40 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/kr/pretty" @@ -64,10 +63,6 @@ type LocalResult struct { MaybeAddToSplitQueue bool // Call MaybeGossipNodeLiveness with the specified Span, if set. MaybeGossipNodeLiveness *roachpb.Span - // FreezeStart indicates the high water mark timestamp beyond which the range - // is guaranteed to not have served any requests. This value is only set when - // a range merge is in progress. If set, call maybeWatchForMerge. - FreezeStart hlc.Timestamp // Metrics contains counters which are to be passed to the // metrics subsystem. @@ -87,7 +82,6 @@ func (lResult *LocalResult) IsZero() bool { !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && lResult.MaybeGossipNodeLiveness == nil && - lResult.FreezeStart.IsEmpty() && lResult.Metrics == nil } @@ -100,13 +94,13 @@ func (lResult *LocalResult) String() string { "#updated txns: %d #end txns: %d, "+ "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ - "MaybeGossipNodeLiveness:%s FreezeStart:%s", + "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, - lResult.MaybeGossipNodeLiveness, lResult.FreezeStart) + lResult.MaybeGossipNodeLiveness) } // DetachEncounteredIntents returns (and removes) those encountered @@ -343,13 +337,6 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.MaybeGossipNodeLiveness = nil - if p.Local.FreezeStart.IsEmpty() { - p.Local.FreezeStart = q.Local.FreezeStart - } else if !q.Local.FreezeStart.IsEmpty() { - return errors.AssertionFailedf("conflicting FreezeStart") - } - q.Local.FreezeStart = hlc.Timestamp{} - coalesceBool(&p.Local.GossipFirstRange, &q.Local.GossipFirstRange) coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig) coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index b4390c522831..acb8d232ef23 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4350,92 +4350,103 @@ func sendWithTxn( return pErr.GoError() } -// TestHistoricalReadsAfterSubsume tests that a subsumed right hand side range -// can only serve read-only traffic for timestamps that precede the subsumption -// time, but don't contain the subsumption time in their uncertainty interval. -func TestHistoricalReadsAfterSubsume(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - - maxOffset := 100 * time.Millisecond - preUncertaintyTs := func(ts hlc.Timestamp) hlc.Timestamp { - return hlc.Timestamp{ - WallTime: ts.GoTime().Add(-maxOffset).UnixNano() - 1, - Logical: ts.Logical, - } - } - - type testCase struct { - name string - queryTsFunc func(freezeStart hlc.Timestamp) hlc.Timestamp - queryArgsFunc func(key roachpb.Key) roachpb.Request - shouldBlock bool - } - - tests := []testCase{ - // Ensure that a read query for a timestamp older than freezeStart-MaxOffset - // is let through. - { - name: "historical read", - queryTsFunc: preUncertaintyTs, - queryArgsFunc: func(key roachpb.Key) roachpb.Request { - return getArgs(key) - }, - shouldBlock: false, - }, - // Write queries for the same historical timestamp should block (and then - // eventually fail because the range no longer exists). - { - name: "historical write", - queryTsFunc: preUncertaintyTs, - queryArgsFunc: func(key roachpb.Key) roachpb.Request { - return putArgs(key, []byte(`test value`)) - }, - shouldBlock: true, - }, - // Read queries that contain the subsumption time in its uncertainty interval - // should block and eventually fail. - { - name: "historical read with uncertainty", - queryTsFunc: func(freezeStart hlc.Timestamp) hlc.Timestamp { - return freezeStart.Prev() - }, - queryArgsFunc: func(key roachpb.Key) roachpb.Request { - return getArgs(key) - }, - shouldBlock: true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc := - setupClusterWithSubsumedRange(ctx, t, 1 /* numNodes */, maxOffset) - defer tc.Stopper().Stop(ctx) - errCh := make(chan error) - go func() { - errCh <- sendWithTxn(store, rhsDesc, test.queryTsFunc(freezeStart), maxOffset, - test.queryArgsFunc(rhsDesc.StartKey.AsRawKey())) - }() - if test.shouldBlock { - waitForBlocked() - cleanupFunc() - // RHS should cease to exist once the merge completes but we cannot - // guarantee that the merge wasn't internally retried before it was able - // to successfully commit. If it did, requests blocked on the previous - // merge attempt might go through successfully. Thus, we cannot make any - // assertions about the result of these blocked requests. - <-errCh - } else { - require.NoError(t, <-errCh) - // We cleanup *after* the non-blocking read request succeeds to prevent - // it from racing with the merge commit trigger. - cleanupFunc() - } - }) - } -} +// TODO(nvanbenschoten): fix this test. In b192bba, we allowed historical reads +// up to the freeze start time on subsumed ranges. This turned out not to be +// quite the right idea, because we can now ship the timestamp cache to the LHS +// and be more optimal about the resulting timestamp cache on the joint range. +// However, since we started allowing reads up to the freeze time, we were +// effectively closing this time for all future writes on the joint range, so we +// couldn't take advantage of the new ability to ship the timestamp cache +// around. But the change was very well intentioned and revealed that we should +// have no problem allowing reads below the closed timestamp on subsumed ranges. +// Add support for this and update this test. +// +// // TestHistoricalReadsAfterSubsume tests that a subsumed right hand side range +// // can only serve read-only traffic for timestamps that precede the subsumption +// // time, but don't contain the subsumption time in their uncertainty interval. +// func TestHistoricalReadsAfterSubsume(t *testing.T) { +// defer leaktest.AfterTest(t)() +// defer log.Scope(t).Close(t) +// ctx := context.Background() + +// maxOffset := 100 * time.Millisecond +// preUncertaintyTs := func(ts hlc.Timestamp) hlc.Timestamp { +// return hlc.Timestamp{ +// WallTime: ts.GoTime().Add(-maxOffset).UnixNano() - 1, +// Logical: ts.Logical, +// } +// } + +// type testCase struct { +// name string +// queryTsFunc func(freezeStart hlc.Timestamp) hlc.Timestamp +// queryArgsFunc func(key roachpb.Key) roachpb.Request +// shouldBlock bool +// } + +// tests := []testCase{ +// // Ensure that a read query for a timestamp older than freezeStart-MaxOffset +// // is let through. +// { +// name: "historical read", +// queryTsFunc: preUncertaintyTs, +// queryArgsFunc: func(key roachpb.Key) roachpb.Request { +// return getArgs(key) +// }, +// shouldBlock: false, +// }, +// // Write queries for the same historical timestamp should block (and then +// // eventually fail because the range no longer exists). +// { +// name: "historical write", +// queryTsFunc: preUncertaintyTs, +// queryArgsFunc: func(key roachpb.Key) roachpb.Request { +// return putArgs(key, []byte(`test value`)) +// }, +// shouldBlock: true, +// }, +// // Read queries that contain the subsumption time in its uncertainty interval +// // should block and eventually fail. +// { +// name: "historical read with uncertainty", +// queryTsFunc: func(freezeStart hlc.Timestamp) hlc.Timestamp { +// return freezeStart.Prev() +// }, +// queryArgsFunc: func(key roachpb.Key) roachpb.Request { +// return getArgs(key) +// }, +// shouldBlock: true, +// }, +// } + +// for _, test := range tests { +// t.Run(test.name, func(t *testing.T) { +// tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc := +// setupClusterWithSubsumedRange(ctx, t, 1 /* numNodes */, maxOffset) +// defer tc.Stopper().Stop(ctx) +// errCh := make(chan error) +// go func() { +// errCh <- sendWithTxn(store, rhsDesc, test.queryTsFunc(freezeStart), maxOffset, +// test.queryArgsFunc(rhsDesc.StartKey.AsRawKey())) +// }() +// if test.shouldBlock { +// waitForBlocked() +// cleanupFunc() +// // RHS should cease to exist once the merge completes but we cannot +// // guarantee that the merge wasn't internally retried before it was able +// // to successfully commit. If it did, requests blocked on the previous +// // merge attempt might go through successfully. Thus, we cannot make any +// // assertions about the result of these blocked requests. +// <-errCh +// } else { +// require.NoError(t, <-errCh) +// // We cleanup *after* the non-blocking read request succeeds to prevent +// // it from racing with the merge commit trigger. +// cleanupFunc() +// } +// }) +// } +// } // TestStoreBlockTransferLeaseRequestAfterSubsumption tests that a // TransferLeaseRequest checks & waits for an ongoing merge before it can be diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index cc6e73d1fa98..13368fe4e44d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -964,10 +964,6 @@ func (r *Replica) mergeInProgressRLocked() bool { return r.mu.mergeComplete != nil } -func (r *Replica) getFreezeStartRLocked() hlc.Timestamp { - return r.mu.freezeStart -} - // setLastReplicaDescriptors sets the most recently seen replica // descriptors to those contained in the *RaftMessageRequest, acquiring r.mu // to do so. @@ -1269,6 +1265,8 @@ func (r *Replica) checkExecutionCanProceed( // a merge's critical phase (i.e. while the RHS of the merge is // subsumed). if err := r.shouldWaitForPendingMergeRLocked(ctx, ba); err != nil { + // TODO(nvanbenschoten): we should still be able to serve reads + // below the closed timestamp in this case. return kvserverpb.LeaseStatus{}, err } } @@ -1344,51 +1342,12 @@ func (r *Replica) shouldWaitForPendingMergeRLocked( return nil } - // TODO(nvanbenschoten): this isn't quite right. We shouldn't allow non-txn - // requests through here for the same reason why lease transfers can only - // allow concurrent reads > max_offset below the lease transfer time. - if ba.IsReadOnly() { - freezeStart := r.getFreezeStartRLocked() - ts := ba.Timestamp - if ba.Txn != nil { - ts.Forward(ba.Txn.GlobalUncertaintyLimit) - } - if ts.Less(freezeStart) { - // When the max timestamp of a read request is less than the subsumption - // time recorded by this Range (freezeStart), we're guaranteed that none - // of the writes accepted by the leaseholder for the keyspan (which could - // be a part of the subsuming range if the merge succeeded, or part of - // this range if it didn't) for timestamps after the subsumption timestamp - // could have causally preceded the current request. Letting such requests - // go through does not violate any of the invariants guaranteed by - // Subsume(). - // - // NB: It would be incorrect to serve this read request if freezeStart - // were in its uncertainty window. For the sake of contradiction, consider - // the following scenario, if such a request were allowed to proceed: - // 1. This range gets subsumed, `maybeWatchForMerge` is called and the - // `mergeCompleteCh` channel is set up. - // 2. A read request *that succeeds the subsumption in real time* comes in - // for a timestamp that contains `freezeStart` in its uncertainty interval - // before the `mergeCompleteCh` channel is removed. Let's say the read - // timestamp of this request is X (with X <= freezeStart), and let's - // denote its uncertainty interval by [X, Y). - // 3. By the time this request reaches `shouldWaitForPendingMergeRLocked`, the - // merge has committed so all subsequent requests are directed to the - // leaseholder of the (subsuming) left-hand range but this pre-merge range - // hasn't been destroyed yet. - // 4. If the (post-merge) left-hand side leaseholder had accepted any new - // writes with timestamps in the window [freezeStart, Y), we would - // potentially have a stale read, as any of the writes in this window could - // have causally preceded the aforementioned read. - return nil - } - } - // This request cannot proceed until the merge completes, signaled by the - // closing of the channel. + // The replica is being merged into its left-hand neighbor. This request + // cannot proceed until the merge completes, signaled by the closing of the + // channel. // // It is very important that this check occur after we have acquired latches - // from the spanlatch manager. Only after we release these latches are we + // from the spanlatch manager. Only after we acquire these latches are we // guaranteed that we're not racing with a Subsume command. (Subsume // commands declare a conflict with all other commands.) It is also // important that this check occur after we have verified that this replica @@ -1485,32 +1444,53 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { r.mu.replicaID > rightDesc.ReplicaID } -// maybeWatchForMerge checks whether a merge of this replica into its left -// neighbor is in its critical phase and, if so, arranges to block all requests, -// except for read-only requests that are older than `freezeStart`, until the -// merge completes. -func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timestamp) error { +// WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to +// be in progress and returns an error if one is not. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) WatchForMerge(ctx context.Context) error { + ok, err := r.maybeWatchForMerge(ctx) + if err != nil { + return err + } else if !ok { + return errors.AssertionFailedf("range merge unexpectedly not in-progress") + } + return nil +} + +// maybeWatchForMergeLocked checks whether a merge of this replica into its left +// neighbor is in its critical phase and, if so, arranges to block all requests +// until the merge completes. Returns a boolean indicating whether a merge was +// found to be in progress. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) maybeWatchForMerge(ctx context.Context) (bool, error) { r.mu.Lock() defer r.mu.Unlock() - return r.maybeWatchForMergeLocked(ctx, freezeStart) + return r.maybeWatchForMergeLocked(ctx) } -func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc.Timestamp) error { +func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { + // Checking for a deletion intent on the local range descriptor, which + // indicates that a merge is in progress and this range is currently in its + // critical phase of being subsumed by its left-hand side neighbor. Read + // inconsistently at the maximum timestamp to ensure that we see an intent + // if one exists, regardless of what timestamp it is written at. desc := r.descRLocked() descKey := keys.RangeDescriptorKey(desc.StartKey) - _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(), + _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp, storage.MVCCGetOptions{Inconsistent: true}) if err != nil { - return err + return false, err } else if intent == nil { - return nil + return false, nil } val, _, err := storage.MVCCGetAsTxn( ctx, r.Engine(), descKey, intent.Txn.WriteTimestamp, intent.Txn) if err != nil { - return err + return false, err } else if val != nil { - return nil + return false, nil } // At this point, we know we have a deletion intent on our range descriptor. @@ -1523,14 +1503,8 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc. // Another request already noticed the merge, installed a mergeComplete // channel, and launched a goroutine to watch for the merge's completion. // Nothing more to do. - return nil + return true, nil } - // Note that if the merge txn retries for any reason (for example, if the - // left-hand side range undergoes a lease transfer before the merge - // completes), the right-hand side range will get re-subsumed. This will - // lead to `freezeStart` being overwritten with the new subsumption time. - // This is fine. - r.mu.freezeStart = freezeStart r.mu.mergeComplete = mergeCompleteCh // The RHS of a merge is not permitted to quiesce while a mergeComplete // channel is installed. (If the RHS is quiescent when the merge commits, any @@ -1647,7 +1621,6 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc. // Unblock pending requests. If the merge committed, the requests will // notice that the replica has been destroyed and return an appropriate // error. If the merge aborted, the requests will be handled normally. - r.mu.freezeStart = hlc.Timestamp{} r.mu.mergeComplete = nil close(mergeCompleteCh) r.mu.Unlock() @@ -1661,7 +1634,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc. // requests will get dropped and retried on another node. Suppress the error. err = nil } - return err + return true, err } // maybeTransferRaftLeadershipToLeaseholderLocked attempts to transfer the diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 5088a69a0357..93e7e97e8b9c 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -118,22 +118,6 @@ func (rec *SpanSetReplicaEvalContext) GetTracker() closedts.TrackerI { return rec.i.GetTracker() } -// GetFrozenClosedTimestamp is part of the EvalContext interface. -func (rec *SpanSetReplicaEvalContext) GetFrozenClosedTimestamp() hlc.Timestamp { - // To capture a closed timestamp, all keys must be latched to prevent any - // concurrent writes (which could advance the closed timestamp). - desc := rec.i.Desc() - rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{ - Key: keys.MakeRangeKeyPrefix(desc.StartKey), - EndKey: keys.MakeRangeKeyPrefix(desc.EndKey), - }) - rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{ - Key: desc.StartKey.AsRawKey(), - EndKey: desc.EndKey.AsRawKey(), - }) - return rec.i.GetFrozenClosedTimestamp() -} - // IsFirstRange returns true iff the replica belongs to the first range. func (rec *SpanSetReplicaEvalContext) IsFirstRange() bool { return rec.i.IsFirstRange() @@ -228,7 +212,7 @@ func (rec SpanSetReplicaEvalContext) GetRangeInfo(ctx context.Context) roachpb.R } // GetCurrentReadSummary is part of the EvalContext interface. -func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() rspb.ReadSummary { +func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { // To capture a read summary over the range, all keys must be latched for // writing to prevent any concurrent reads or writes. desc := rec.i.Desc() @@ -262,3 +246,14 @@ func (rec *SpanSetReplicaEvalContext) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { return rec.i.GetExternalStorageFromURI(ctx, uri, user) } + +// RevokeLease stops the replica from using its current lease. +func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + rec.i.RevokeLease(ctx, seq) +} + +// WatchForMerge arranges to block all requests until the in-progress merge +// completes. +func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error { + return rec.i.WatchForMerge(ctx) +} diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 4bbee7dfdb28..9bef96e47c58 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -158,17 +158,20 @@ func (r *Replica) maxClosedRLocked(ctx context.Context) (_ hlc.Timestamp, ok boo return maxClosed, true } -// GetFrozenClosedTimestamp returns the closed timestamp. Unlike -// MaxClosedTimestamp, it only looks at the "new" closed timestamp mechanism, -// ignoring the old one. It returns an empty result if the new mechanism is not -// enabled yet. The new mechanism has better properties than the old one - -// namely the closing of timestamps is synchronized with subsumption requests -// (through latches). Callers who need that property should be prepared to get -// an empty result back, meaning that the closed timestamp cannot be known. -func (r *Replica) GetFrozenClosedTimestamp() hlc.Timestamp { +// ClosedTimestampV2 returns the closed timestamp. Unlike MaxClosedTimestamp, it +// only looks at the "new" closed timestamp mechanism, ignoring the old one. It +// returns an empty result if the new mechanism is not enabled yet. The new +// mechanism has better properties than the old one - namely the closing of +// timestamps is synchronized with lease transfers and subsumption requests. +// Callers who need that property should be prepared to get an empty result +// back, meaning that the closed timestamp cannot be known. +func (r *Replica) ClosedTimestampV2() hlc.Timestamp { r.mu.RLock() defer r.mu.RUnlock() - // TODO(andrei): Make sure that this synchronizes with the closed timestamps - // side-transport once the side-transport is written. + return r.closedTimestampV2RLocked() +} + +func (r *Replica) closedTimestampV2RLocked() hlc.Timestamp { + // TODO(andrei,nvanbenschoten): include sideTransportClosedTimestamp. return r.mu.state.ClosedTimestamp } diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 42b63760bb52..a5d8ff40f7db 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -735,8 +735,9 @@ func TestLearnerNoAcceptLease(t *testing.T) { desc := tc.LookupRangeOrFatal(t, scratchStartKey) err := tc.TransferRangeLease(desc, tc.Target(1)) - if !testutils.IsError(err, `cannot transfer lease to replica of type LEARNER`) { - t.Fatalf(`expected "cannot transfer lease to replica of type LEARNER" error got: %+v`, err) + exp := `replica cannot hold lease` + if !testutils.IsError(err, exp) { + t.Fatalf(`expected %q error got: %+v`, exp, err) } } @@ -760,7 +761,7 @@ func TestJointConfigLease(t *testing.T) { require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) err := tc.TransferRangeLease(desc, tc.Target(1)) - exp := `cannot transfer lease to replica of type VOTER_INCOMING` + exp := `replica cannot hold lease` require.True(t, testutils.IsError(err, exp), err) // NB: we don't have to transition out of the previous joint config first @@ -768,7 +769,6 @@ func TestJointConfigLease(t *testing.T) { // it's asked to do. desc = tc.RemoveVotersOrFatal(t, k, tc.Target(1)) err = tc.TransferRangeLease(desc, tc.Target(1)) - exp = `cannot transfer lease to replica of type VOTER_DEMOTING_LEARNER` require.True(t, testutils.IsError(err, exp), err) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 2d3da2289028..5b25354c6104 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -396,13 +396,7 @@ func (r *Replica) leasePostApplyLocked( // progress, as only the old leaseholder would have been explicitly notified // of the merge. If there is a merge in progress, maybeWatchForMerge will // arrange to block all traffic to this replica unless the merge aborts. - // NB: If the subsumed range changes leaseholders after subsumption, - // `freezeStart` will be zero and we will effectively be blocking all read - // requests. - // TODO(aayush): In the future, if we permit co-operative lease transfers - // when a range is subsumed, it should be relatively straightforward to - // allow historical reads on the subsumed RHS after such lease transfers. - if err := r.maybeWatchForMergeLocked(ctx, hlc.Timestamp{} /* freezeStart */); err != nil { + if _, err := r.maybeWatchForMergeLocked(ctx); err != nil { // We were unable to determine whether a merge was in progress. We cannot // safely proceed. log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s", @@ -629,9 +623,6 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re if lResult.EndTxns != nil { log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns) } - if !lResult.FreezeStart.IsEmpty() { - log.Fatalf(ctx, "LocalEvalResult.FreezeStart should have been handled and reset: %s", lResult.FreezeStart) - } if lResult.AcquiredLocks != nil { for i := range lResult.AcquiredLocks { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 97cfffd64cbe..8557f23eed74 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -785,18 +785,6 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, errors.Errorf("unable to find store %d in range %+v", target, desc) } - // For now, don't allow replicas of type LEARNER to be leaseholders, see - // comments in RequestLease and TransferLease for why. - // - // TODO(dan): We shouldn't need this, the checks in RequestLease and - // TransferLease are the canonical ones and should be sufficient. Sadly, the - // `r.mu.minLeaseProposedTS = status.Timestamp` line below will likely play - // badly with that. This would be an issue even without learners, but - // omitting this check would make it worse. Fixme. - if t := nextLeaseHolder.GetType(); t != roachpb.VOTER_FULL { - return nil, nil, errors.Errorf(`cannot transfer lease to replica of type %s`, t) - } - if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok && nextLease.Replica != nextLeaseHolder { repDesc, err := r.getReplicaDescriptorRLocked() @@ -813,15 +801,7 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, newNotLeaseHolderError(nextLease, r.store.StoreID(), desc, "another transfer to a different store is in progress") } - // Stop using the current lease. All future calls to leaseStatus on this - // node with the current lease will now return a PROSCRIBED status. - // - // TODO(nvanbenschoten): since we aren't pulling the transfer time here - // anymore, we could also move this below latching as well, similar to - // how a Subsume request sets the FreezeStart time and pauses closed - // timestamps during evaluation and communicates this back up using the - // LocalResult. - r.mu.minLeaseProposedTS = now + transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true, /* transfer */ ) @@ -877,6 +857,17 @@ func (r *Replica) getLeaseRLocked() (roachpb.Lease, roachpb.Lease) { return *r.mu.state.Lease, roachpb.Lease{} } +// RevokeLease stops the replica from using its current lease, if that lease +// matches the provided lease sequence. All future calls to leaseStatus on this +// node with the current lease will now return a PROSCRIBED status. +func (r *Replica) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.state.Lease.Sequence == seq { + r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() + } +} + // newNotLeaseHolderError returns a NotLeaseHolderError initialized with the // replica for the holder (if any) of the given lease. // diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 5f614a4db004..5b77e3caad10 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -180,16 +180,6 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.AcquiredLocks = nil } - if !lResult.FreezeStart.IsEmpty() { - // A merge is (likely) about to be carried out, and this replica needs to - // block all non-read traffic until the merge either commits or aborts. See - // docs/tech-notes/range-merges.md. - if err := r.maybeWatchForMerge(ctx, lResult.FreezeStart); err != nil { - return roachpb.NewError(err) - } - lResult.FreezeStart = hlc.Timestamp{} - } - if !lResult.IsZero() { log.Fatalf(ctx, "unhandled field in LocalEvalResult: %s", pretty.Diff(lResult, result.LocalResult{})) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 8a4cb17102cc..6d7fc678e039 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -741,7 +741,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { return nil } transferSem := make(chan struct{}) - tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + tsc.TestingKnobs.EvalKnobs.TestingPostEvalFilter = func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.TransferLeaseRequest); ok { // Notify the test that the transfer has been trapped. diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 614cf938b8cf..aaf37efc9fd8 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -548,12 +548,14 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key { // GetCurrentReadSummary returns a new ReadSummary reflecting all reads served // by the range to this point. -func (r *Replica) GetCurrentReadSummary() rspb.ReadSummary { +func (r *Replica) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) { sum := collectReadSummaryFromTimestampCache(r.store.tsCache, r.Desc()) // Forward the read summary by the range's closed timestamp, because any - // replica could have served reads below this time. - sum.Merge(rspb.FromTimestamp(r.GetFrozenClosedTimestamp())) - return sum + // replica could have served reads below this time. We also return the + // closed timestamp separately, in case callers want it split out. + closedTS := r.ClosedTimestampV2() + sum.Merge(rspb.FromTimestamp(closedTS)) + return sum, closedTS } // collectReadSummaryFromTimestampCache constucts a read summary for the range diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 26d3e0d15856..9392366fc6f6 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -53,10 +53,8 @@ func init() { // Default stderrThreshold to log everything to the process' // external stderr (OrigStderr). defaultConfig.Sinks.Stderr.Filter = severity.INFO - // We only register it for the DEV channels. No other - // channels get a configuration, whereby every channel - // ends up sharing the DEV logger (debugLog). - defaultConfig.Sinks.Stderr.Channels.Channels = []logpb.Channel{channel.DEV} + // Ensure all channels go to stderr. + defaultConfig.Sinks.Stderr.Channels.Channels = logconfig.SelectAllChannels() // We also don't capture internal writes to fd2 by default: // let the writes go to the external stderr. defaultConfig.CaptureFd2.Enable = false diff --git a/pkg/util/log/flags_test.go b/pkg/util/log/flags_test.go index f0ce02a437ef..54a3b7e618c3 100644 --- a/pkg/util/log/flags_test.go +++ b/pkg/util/log/flags_test.go @@ -25,7 +25,7 @@ func TestAppliedStandaloneConfig(t *testing.T) { const expected = `sinks: stderr: - channels: [DEV] + channels: all filter: INFO format: crdb-v2-tty redact: false diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index 2305f5545a1e..6fef5711191e 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -564,7 +564,7 @@ func parseChannelList(s string) ([]logpb.Channel, error) { // Special case: "ALL" selects all channels. if s == "ALL" { - return selectAllChannels(), nil + return SelectAllChannels(), nil } // If channels starts with "all except", we invert the selection. @@ -597,7 +597,7 @@ func selectChannels(invert bool, parts []string) ([]logpb.Channel, error) { if len(parts) != 1 { return nil, errors.New("cannot use ALL if there are other channel names present in the list") } - return selectAllChannels(), nil + return SelectAllChannels(), nil } // Verify the channel name is known. @@ -636,9 +636,9 @@ func selectChannels(invert bool, parts []string) ([]logpb.Channel, error) { return selected, nil } -// selectAllChannels returns a copy of channelValues, +// SelectAllChannels returns a copy of channelValues, // for use in the ALL configuration. -func selectAllChannels() []logpb.Channel { +func SelectAllChannels() []logpb.Channel { // Copy the default in case the code that uses a Config overwrites // its channel list in-place. chans := make([]logpb.Channel, 0, len(channelValues))