Skip to content

Commit

Permalink
Merge #71243 #74333
Browse files Browse the repository at this point in the history
71243: rpc: add logging tags for rpc connect events r=andreimatei a=knz

Epic: CRDB-11517
First commit from #75237.

This ensures that the log messages sent to the DEV channel relating to
RPC low-level activity (e.g. conn failures, heartbeat etc) include the
remote node ID and address and connection class.

This also connects the heartbeat task to the tracing infra properly.

Example before:
```
[n6] 87  dialing n4: 127.0.0.1:41706 (default)
```

After:
```
[n6,rnode=4,raddr=127.0.0.1:41706,class=default] 87  dialing
```


74333: kvserver/loqrecovery: write replica recovery events to rangelog r=tbg,erikgrinaker a=aliher1911

Previously a fact of loss of quorum replica recovery was only written
as a structured log entry. This information is local to the node and
does not survive if node is decommissioned. It would be beneficial
to preserve this information longer. Range log while being limited
to 30 days still provide a good reference data for investigations if
recovery wasn't performed too long ago.
This patch adds entries to rangelog for every updated range first time
node that holds survivor replica is started after recovery.

Release note: None

Addresses #73679

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
  • Loading branch information
3 people committed Jan 21, 2022
3 parents 0a40796 + 256f2b7 + a7b2ee8 commit 22f45e5
Show file tree
Hide file tree
Showing 17 changed files with 342 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-46 set the active cluster version in the format '<major>.<minor>'
version version 21.2-48 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-46</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-48</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
5 changes: 3 additions & 2 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
}

for _, r := range prepReport.SkippedReplicas {
_, _ = fmt.Fprintf(stderr, "Replica %s for range r%d is already updated.\n", r.Replica, r.RangeID)
_, _ = fmt.Fprintf(stderr, "Replica %s for range r%d is already updated.\n",
r.Replica, r.RangeID())
}

if len(prepReport.UpdatedReplicas) == 0 {
Expand All @@ -459,7 +460,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
for _, r := range prepReport.UpdatedReplicas {
message := fmt.Sprintf(
"Replica %s for range %d:%s will be updated to %s with peer replica(s) removed: %s",
r.OldReplica, r.RangeID, r.StartKey, r.Replica, r.RemovedReplicas)
r.OldReplica, r.RangeID(), r.StartKey(), r.Replica, r.RemovedReplicas)
if r.AbortedTransaction {
message += fmt.Sprintf(", and range update transaction %s aborted.",
r.AbortedTransactionID.Short())
Expand Down
14 changes: 14 additions & 0 deletions pkg/cli/debug_recover_loss_of_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -200,4 +201,17 @@ func TestLossOfQuorumRecovery(t *testing.T) {
var replicas string
r.Scan(&replicas)
require.Equal(t, "{1,4,5}", replicas, "Replicas after loss of quorum recovery")

// Validate that rangelog is updated by recovery records after cluster restarts.
testutils.SucceedsSoon(t, func() error {
r := s.QueryRow(t,
`select count(*) from system.rangelog where "eventType" = 'unsafe_quorum_recovery'`)
var recoveries int
r.Scan(&recoveries)
if recoveries != len(plan.Updates) {
return errors.Errorf("found %d recovery events while expecting %d", recoveries,
len(plan.Updates))
}
return nil
})
}
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ const (
ScanWholeRows
// SCRAM authentication is available.
SCRAMAuthentication
// UnsafeLossOfQuorumRecoveryRangeLog adds a new value to RangeLogEventReason
// that correspond to range descriptor changes resulting from recovery
// procedures.
UnsafeLossOfQuorumRecoveryRangeLog

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -372,6 +376,10 @@ var versionsSingleton = keyedVersions{
Key: SCRAMAuthentication,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 46},
},
{
Key: UnsafeLossOfQuorumRecoveryRangeLog,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 48},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverpb/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ const (
ReasonRebalance RangeLogEventReason = "rebalance"
ReasonAdminRequest RangeLogEventReason = "admin request"
ReasonAbandonedLearner RangeLogEventReason = "abandoned learner replica"
ReasonUnsafeRecovery RangeLogEventReason = "unsafe loss of quorum recovery"
)
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvserverpb/log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ enum RangeLogEventType {
add_non_voter = 4;
// RemoveNonVoter is the event type recorded when a range removes an existing non-voting replica.
remove_non_voter = 5;
// UnsafeQuorumRecovery is the event type recorded when all replicas are
// replaced by a new one that acts as the source of truth possibly losing
// latest updates.
unsafe_quorum_recovery = 6;
}

message RangeLogEvent {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb:with-mocks",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
Expand All @@ -29,6 +31,7 @@ go_library(
go_test(
name = "loqrecovery_test",
srcs = [
"record_test.go",
"recovery_env_test.go",
"recovery_test.go",
],
Expand Down
22 changes: 16 additions & 6 deletions pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ type PrepareStoreReport struct {
}

// PrepareReplicaReport contains information about prepared change for a replica.
// Its purpose is to inform user about actions to be performed.
// Its purpose is to inform user about actions to be performed. And create a record
// that would be applied to structured log, rangelog and other downstream systems
// for audit purposes.
type PrepareReplicaReport struct {
// Replica identification data.
RangeID roachpb.RangeID
StartKey roachpb.RKey
Replica roachpb.ReplicaDescriptor
Descriptor roachpb.RangeDescriptor
OldReplica roachpb.ReplicaDescriptor

// AlreadyUpdated is true if state of replica in store already matches desired
Expand All @@ -66,6 +67,16 @@ type PrepareReplicaReport struct {
AbortedTransactionID uuid.UUID
}

// RangeID of underlying range.
func (r PrepareReplicaReport) RangeID() roachpb.RangeID {
return r.Descriptor.RangeID
}

// StartKey of underlying range.
func (r PrepareReplicaReport) StartKey() roachpb.RKey {
return r.Descriptor.StartKey
}

// PrepareUpdateReplicas prepares all changes to be committed to provided stores
// as a first step of apply stage. This function would write changes to stores
// using provided batches and return a summary of changes that were done together
Expand Down Expand Up @@ -131,9 +142,7 @@ func applyReplicaUpdate(
) (PrepareReplicaReport, error) {
clock := hlc.NewClock(hlc.UnixNano, 0)
report := PrepareReplicaReport{
RangeID: update.RangeID,
Replica: update.NewReplica,
StartKey: update.StartKey.AsRKey(),
Replica: update.NewReplica,
}

// Write the rewritten descriptor to the range-local descriptor
Expand Down Expand Up @@ -275,6 +284,7 @@ func applyReplicaUpdate(
nil /* txn */, &newDesc); err != nil {
return PrepareReplicaReport{}, err
}
report.Descriptor = newDesc
report.RemovedReplicas = localDesc.Replicas()
report.OldReplica, _ = report.RemovedReplicas.RemoveReplica(
update.NewReplica.NodeID, update.NewReplica.StoreID)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,6 @@ message ReplicaRecoveryRecord {
(gogoproto.moretags) = 'yaml:"OldReplicaID"'];
roachpb.ReplicaDescriptor new_replica = 6 [(gogoproto.nullable) = false,
(gogoproto.moretags) = 'yaml:"NewReplica"'];
roachpb.RangeDescriptor range_descriptor = 7 [(gogoproto.nullable) = false,
(gogoproto.moretags) = 'yaml:"RangeDescriptor"'];
}
66 changes: 56 additions & 10 deletions pkg/kv/kvserver/loqrecovery/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ package loqrecovery

import (
"context"
"encoding/json"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand All @@ -35,12 +38,13 @@ func writeReplicaRecoveryStoreRecord(
readWriter storage.ReadWriter,
) error {
record := loqrecoverypb.ReplicaRecoveryRecord{
Timestamp: timestamp,
RangeID: report.RangeID,
StartKey: update.StartKey,
EndKey: update.StartKey,
OldReplicaID: report.OldReplica.ReplicaID,
NewReplica: update.NewReplica,
Timestamp: timestamp,
RangeID: report.RangeID(),
StartKey: update.StartKey,
EndKey: update.StartKey,
OldReplicaID: report.OldReplica.ReplicaID,
NewReplica: update.NewReplica,
RangeDescriptor: report.Descriptor,
}

data, err := protoutil.Marshal(&record)
Expand All @@ -55,15 +59,12 @@ func writeReplicaRecoveryStoreRecord(
}

// RegisterOfflineRecoveryEvents checks if recovery data was captured in the
// store and notifies callback about all registered events. Its up to the
// store and notifies callback about all registered events. It's up to the
// callback function to send events where appropriate. Events are removed
// from the store unless callback returns false or error. If latter case events
// would be reprocessed on subsequent call to this function.
// This function is called on startup to ensure that any offline replica
// recovery actions are properly reflected in server logs as needed.
//
// TODO(oleg): #73679 add events to the rangelog. That would require registering
// events at the later stage of startup when SQL is already available.
func RegisterOfflineRecoveryEvents(
ctx context.Context,
readWriter storage.ReadWriter,
Expand Down Expand Up @@ -118,3 +119,48 @@ func RegisterOfflineRecoveryEvents(
}
return successCount, nil
}

// UpdateRangeLogWithRecovery inserts a range log update to system.rangelog
// using information from recovery event.
func UpdateRangeLogWithRecovery(
ctx context.Context,
sqlExec func(ctx context.Context, stmt string, args ...interface{}) (int, error),
event loqrecoverypb.ReplicaRecoveryRecord,
) error {
const insertEventTableStmt = `
INSERT INTO system.rangelog (
timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info
)
VALUES(
$1, $2, $3, $4, $5, $6
)
`
updateInfo := kvserverpb.RangeLogEvent_Info{
UpdatedDesc: &event.RangeDescriptor,
AddedReplica: &event.NewReplica,
Reason: kvserverpb.ReasonUnsafeRecovery,
Details: "Performed unsafe range loss of quorum recovery",
}
infoBytes, err := json.Marshal(updateInfo)
if err != nil {
return errors.Wrap(err, "failed to serialize a RangeLog info entry")
}
args := []interface{}{
timeutil.Unix(0, event.Timestamp),
event.RangeID,
event.NewReplica.StoreID,
kvserverpb.RangeLogEventType_unsafe_quorum_recovery.String(),
nil, // otherRangeID
string(infoBytes),
}

rows, err := sqlExec(ctx, insertEventTableStmt, args...)
if err != nil {
return errors.Wrap(err, "failed to insert a RangeLog entry")
}
if rows != 1 {
return errors.Errorf("%d row(s) affected by RangeLog insert while expected 1",
rows)
}
return nil
}
101 changes: 101 additions & 0 deletions pkg/kv/kvserver/loqrecovery/record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package loqrecovery

import (
"context"
"errors"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

// TestPublishRangeLogEvents verifies that inserting recovery events into
// RangeLog handles sql execution errors and unexpected results by propagating
// errors up. This is important as caller relies on errors to preserve events if
// they were not reflected in RangeLog.
// It also performs basic sanity check that inserted records have correct range
// id and reason for update and a timestamp.
func TestPublishRangeLogEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

for _, td := range []struct {
name string

// Recovery event and function under test arguments.
rangeID roachpb.RangeID
time int64

// Callback results returned to function under test.
returnedRowCount int
queryExecError error

// Expectations in callback and call results.
expectSuccess bool
}{
{
name: "success",
rangeID: 7,
time: 1021,
returnedRowCount: 1,
expectSuccess: true,
},
{
name: "sql error",
rangeID: 7,
time: 1021,
returnedRowCount: 1,
queryExecError: errors.New("stray sql error occurred"),
},
{
name: "wrong row count",
rangeID: 7,
time: 1021,
returnedRowCount: 0,
expectSuccess: false,
},
} {
t.Run(td.name, func(t *testing.T) {
var actualArgs []interface{}
execFn := func(ctx context.Context, stmt string, args ...interface{}) (int, error) {
actualArgs = args
return td.returnedRowCount, td.queryExecError
}

event := loqrecoverypb.ReplicaRecoveryRecord{
Timestamp: td.time,
RangeID: td.rangeID,
StartKey: loqrecoverypb.RecoveryKey(roachpb.RKeyMin),
EndKey: loqrecoverypb.RecoveryKey(roachpb.RKeyMax),
}

err := UpdateRangeLogWithRecovery(ctx, execFn, event)
if td.expectSuccess {
require.NoError(t, err)
} else {
require.Error(t, err)
}
require.Equal(t, 6, len(actualArgs), "not enough query args were provided")
require.Contains(t, actualArgs[5], "Performed unsafe range loss of quorum recovery")
require.Equal(t, td.rangeID, actualArgs[1], "RangeID query arg doesn't match event")
require.Equal(t, timeutil.Unix(0, td.time), actualArgs[0],
"timestamp query arg doesn't match event")
require.Equal(t, kvserverpb.RangeLogEventType_unsafe_quorum_recovery.String(), actualArgs[3],
"incorrect RangeLog event type")
})
}
}
Loading

0 comments on commit 22f45e5

Please sign in to comment.