diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 247d0ecde0c6..82e5e8319019 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -174,4 +174,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-46 set the active cluster version in the format '.' +version version 21.2-48 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 89fd88f575c1..055c598e7107 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -181,6 +181,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-46set the active cluster version in the format '.' +versionversion21.2-48set the active cluster version in the format '.' diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index 552e962a0128..d297561722e2 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -444,7 +444,8 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { } for _, r := range prepReport.SkippedReplicas { - _, _ = fmt.Fprintf(stderr, "Replica %s for range r%d is already updated.\n", r.Replica, r.RangeID) + _, _ = fmt.Fprintf(stderr, "Replica %s for range r%d is already updated.\n", + r.Replica, r.RangeID()) } if len(prepReport.UpdatedReplicas) == 0 { @@ -459,7 +460,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { for _, r := range prepReport.UpdatedReplicas { message := fmt.Sprintf( "Replica %s for range %d:%s will be updated to %s with peer replica(s) removed: %s", - r.OldReplica, r.RangeID, r.StartKey, r.Replica, r.RemovedReplicas) + r.OldReplica, r.RangeID(), r.StartKey(), r.Replica, r.RemovedReplicas) if r.AbortedTransaction { message += fmt.Sprintf(", and range update transaction %s aborted.", r.AbortedTransactionID.Short()) diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index e9b62988bd3e..b94bb5ce270c 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -200,4 +201,17 @@ func TestLossOfQuorumRecovery(t *testing.T) { var replicas string r.Scan(&replicas) require.Equal(t, "{1,4,5}", replicas, "Replicas after loss of quorum recovery") + + // Validate that rangelog is updated by recovery records after cluster restarts. + testutils.SucceedsSoon(t, func() error { + r := s.QueryRow(t, + `select count(*) from system.rangelog where "eventType" = 'unsafe_quorum_recovery'`) + var recoveries int + r.Scan(&recoveries) + if recoveries != len(plan.Updates) { + return errors.Errorf("found %d recovery events while expecting %d", recoveries, + len(plan.Updates)) + } + return nil + }) } diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 32901b359130..2e34b8aa41f5 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -244,6 +244,10 @@ const ( ScanWholeRows // SCRAM authentication is available. SCRAMAuthentication + // UnsafeLossOfQuorumRecoveryRangeLog adds a new value to RangeLogEventReason + // that correspond to range descriptor changes resulting from recovery + // procedures. + UnsafeLossOfQuorumRecoveryRangeLog // ************************************************* // Step (1): Add new versions here. @@ -372,6 +376,10 @@ var versionsSingleton = keyedVersions{ Key: SCRAMAuthentication, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 46}, }, + { + Key: UnsafeLossOfQuorumRecoveryRangeLog, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 48}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index f4ac61daff19..107d7ac4868d 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -32,11 +32,12 @@ func _() { _ = x[EnableSpanConfigStore-21] _ = x[ScanWholeRows-22] _ = x[SCRAMAuthentication-23] + _ = x[UnsafeLossOfQuorumRecoveryRangeLog-24] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsAlterSystemProtectedTimestampAddColumnEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthentication" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsAlterSystemProtectedTimestampAddColumnEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLog" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 463, 493, 521, 542, 555, 574} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 463, 493, 521, 542, 555, 574, 608} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvserver/kvserverpb/log.go b/pkg/kv/kvserver/kvserverpb/log.go index 6a4b42f65894..64c64f0557f5 100644 --- a/pkg/kv/kvserver/kvserverpb/log.go +++ b/pkg/kv/kvserver/kvserverpb/log.go @@ -23,4 +23,5 @@ const ( ReasonRebalance RangeLogEventReason = "rebalance" ReasonAdminRequest RangeLogEventReason = "admin request" ReasonAbandonedLearner RangeLogEventReason = "abandoned learner replica" + ReasonUnsafeRecovery RangeLogEventReason = "unsafe loss of quorum recovery" ) diff --git a/pkg/kv/kvserver/kvserverpb/log.proto b/pkg/kv/kvserver/kvserverpb/log.proto index d775de4618e1..2c6d9b40c90b 100644 --- a/pkg/kv/kvserver/kvserverpb/log.proto +++ b/pkg/kv/kvserver/kvserverpb/log.proto @@ -31,6 +31,10 @@ enum RangeLogEventType { add_non_voter = 4; // RemoveNonVoter is the event type recorded when a range removes an existing non-voting replica. remove_non_voter = 5; + // UnsafeQuorumRecovery is the event type recorded when all replicas are + // replaced by a new one that acts as the source of truth possibly losing + // latest updates. + unsafe_quorum_recovery = 6; } message RangeLogEvent { diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 70777b51d74d..ee37ba00f2c0 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", "//pkg/kv/kvserver/stateloader", "//pkg/roachpb:with-mocks", @@ -21,6 +22,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", ], @@ -29,6 +31,7 @@ go_library( go_test( name = "loqrecovery_test", srcs = [ + "record_test.go", "recovery_env_test.go", "recovery_test.go", ], diff --git a/pkg/kv/kvserver/loqrecovery/apply.go b/pkg/kv/kvserver/loqrecovery/apply.go index bb11b9f461c2..b180ad61c315 100644 --- a/pkg/kv/kvserver/loqrecovery/apply.go +++ b/pkg/kv/kvserver/loqrecovery/apply.go @@ -43,12 +43,13 @@ type PrepareStoreReport struct { } // PrepareReplicaReport contains information about prepared change for a replica. -// Its purpose is to inform user about actions to be performed. +// Its purpose is to inform user about actions to be performed. And create a record +// that would be applied to structured log, rangelog and other downstream systems +// for audit purposes. type PrepareReplicaReport struct { // Replica identification data. - RangeID roachpb.RangeID - StartKey roachpb.RKey Replica roachpb.ReplicaDescriptor + Descriptor roachpb.RangeDescriptor OldReplica roachpb.ReplicaDescriptor // AlreadyUpdated is true if state of replica in store already matches desired @@ -66,6 +67,16 @@ type PrepareReplicaReport struct { AbortedTransactionID uuid.UUID } +// RangeID of underlying range. +func (r PrepareReplicaReport) RangeID() roachpb.RangeID { + return r.Descriptor.RangeID +} + +// StartKey of underlying range. +func (r PrepareReplicaReport) StartKey() roachpb.RKey { + return r.Descriptor.StartKey +} + // PrepareUpdateReplicas prepares all changes to be committed to provided stores // as a first step of apply stage. This function would write changes to stores // using provided batches and return a summary of changes that were done together @@ -131,9 +142,7 @@ func applyReplicaUpdate( ) (PrepareReplicaReport, error) { clock := hlc.NewClock(hlc.UnixNano, 0) report := PrepareReplicaReport{ - RangeID: update.RangeID, - Replica: update.NewReplica, - StartKey: update.StartKey.AsRKey(), + Replica: update.NewReplica, } // Write the rewritten descriptor to the range-local descriptor @@ -275,6 +284,7 @@ func applyReplicaUpdate( nil /* txn */, &newDesc); err != nil { return PrepareReplicaReport{}, err } + report.Descriptor = newDesc report.RemovedReplicas = localDesc.Replicas() report.OldReplica, _ = report.RemovedReplicas.RemoveReplica( update.NewReplica.NodeID, update.NewReplica.StoreID) diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto index 60b1ba51d5a1..c8145e494012 100644 --- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto +++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto @@ -89,4 +89,6 @@ message ReplicaRecoveryRecord { (gogoproto.moretags) = 'yaml:"OldReplicaID"']; roachpb.ReplicaDescriptor new_replica = 6 [(gogoproto.nullable) = false, (gogoproto.moretags) = 'yaml:"NewReplica"']; + roachpb.RangeDescriptor range_descriptor = 7 [(gogoproto.nullable) = false, + (gogoproto.moretags) = 'yaml:"RangeDescriptor"']; } diff --git a/pkg/kv/kvserver/loqrecovery/record.go b/pkg/kv/kvserver/loqrecovery/record.go index 4f045cb55eb6..d043268ec644 100644 --- a/pkg/kv/kvserver/loqrecovery/record.go +++ b/pkg/kv/kvserver/loqrecovery/record.go @@ -12,11 +12,14 @@ package loqrecovery import ( "context" + "encoding/json" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -35,12 +38,13 @@ func writeReplicaRecoveryStoreRecord( readWriter storage.ReadWriter, ) error { record := loqrecoverypb.ReplicaRecoveryRecord{ - Timestamp: timestamp, - RangeID: report.RangeID, - StartKey: update.StartKey, - EndKey: update.StartKey, - OldReplicaID: report.OldReplica.ReplicaID, - NewReplica: update.NewReplica, + Timestamp: timestamp, + RangeID: report.RangeID(), + StartKey: update.StartKey, + EndKey: update.StartKey, + OldReplicaID: report.OldReplica.ReplicaID, + NewReplica: update.NewReplica, + RangeDescriptor: report.Descriptor, } data, err := protoutil.Marshal(&record) @@ -55,15 +59,12 @@ func writeReplicaRecoveryStoreRecord( } // RegisterOfflineRecoveryEvents checks if recovery data was captured in the -// store and notifies callback about all registered events. Its up to the +// store and notifies callback about all registered events. It's up to the // callback function to send events where appropriate. Events are removed // from the store unless callback returns false or error. If latter case events // would be reprocessed on subsequent call to this function. // This function is called on startup to ensure that any offline replica // recovery actions are properly reflected in server logs as needed. -// -// TODO(oleg): #73679 add events to the rangelog. That would require registering -// events at the later stage of startup when SQL is already available. func RegisterOfflineRecoveryEvents( ctx context.Context, readWriter storage.ReadWriter, @@ -118,3 +119,48 @@ func RegisterOfflineRecoveryEvents( } return successCount, nil } + +// UpdateRangeLogWithRecovery inserts a range log update to system.rangelog +// using information from recovery event. +func UpdateRangeLogWithRecovery( + ctx context.Context, + sqlExec func(ctx context.Context, stmt string, args ...interface{}) (int, error), + event loqrecoverypb.ReplicaRecoveryRecord, +) error { + const insertEventTableStmt = ` + INSERT INTO system.rangelog ( + timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info + ) + VALUES( + $1, $2, $3, $4, $5, $6 + ) + ` + updateInfo := kvserverpb.RangeLogEvent_Info{ + UpdatedDesc: &event.RangeDescriptor, + AddedReplica: &event.NewReplica, + Reason: kvserverpb.ReasonUnsafeRecovery, + Details: "Performed unsafe range loss of quorum recovery", + } + infoBytes, err := json.Marshal(updateInfo) + if err != nil { + return errors.Wrap(err, "failed to serialize a RangeLog info entry") + } + args := []interface{}{ + timeutil.Unix(0, event.Timestamp), + event.RangeID, + event.NewReplica.StoreID, + kvserverpb.RangeLogEventType_unsafe_quorum_recovery.String(), + nil, // otherRangeID + string(infoBytes), + } + + rows, err := sqlExec(ctx, insertEventTableStmt, args...) + if err != nil { + return errors.Wrap(err, "failed to insert a RangeLog entry") + } + if rows != 1 { + return errors.Errorf("%d row(s) affected by RangeLog insert while expected 1", + rows) + } + return nil +} diff --git a/pkg/kv/kvserver/loqrecovery/record_test.go b/pkg/kv/kvserver/loqrecovery/record_test.go new file mode 100644 index 000000000000..8e26f8ee39e5 --- /dev/null +++ b/pkg/kv/kvserver/loqrecovery/record_test.go @@ -0,0 +1,101 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package loqrecovery + +import ( + "context" + "errors" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestPublishRangeLogEvents verifies that inserting recovery events into +// RangeLog handles sql execution errors and unexpected results by propagating +// errors up. This is important as caller relies on errors to preserve events if +// they were not reflected in RangeLog. +// It also performs basic sanity check that inserted records have correct range +// id and reason for update and a timestamp. +func TestPublishRangeLogEvents(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + for _, td := range []struct { + name string + + // Recovery event and function under test arguments. + rangeID roachpb.RangeID + time int64 + + // Callback results returned to function under test. + returnedRowCount int + queryExecError error + + // Expectations in callback and call results. + expectSuccess bool + }{ + { + name: "success", + rangeID: 7, + time: 1021, + returnedRowCount: 1, + expectSuccess: true, + }, + { + name: "sql error", + rangeID: 7, + time: 1021, + returnedRowCount: 1, + queryExecError: errors.New("stray sql error occurred"), + }, + { + name: "wrong row count", + rangeID: 7, + time: 1021, + returnedRowCount: 0, + expectSuccess: false, + }, + } { + t.Run(td.name, func(t *testing.T) { + var actualArgs []interface{} + execFn := func(ctx context.Context, stmt string, args ...interface{}) (int, error) { + actualArgs = args + return td.returnedRowCount, td.queryExecError + } + + event := loqrecoverypb.ReplicaRecoveryRecord{ + Timestamp: td.time, + RangeID: td.rangeID, + StartKey: loqrecoverypb.RecoveryKey(roachpb.RKeyMin), + EndKey: loqrecoverypb.RecoveryKey(roachpb.RKeyMax), + } + + err := UpdateRangeLogWithRecovery(ctx, execFn, event) + if td.expectSuccess { + require.NoError(t, err) + } else { + require.Error(t, err) + } + require.Equal(t, 6, len(actualArgs), "not enough query args were provided") + require.Contains(t, actualArgs[5], "Performed unsafe range loss of quorum recovery") + require.Equal(t, td.rangeID, actualArgs[1], "RangeID query arg doesn't match event") + require.Equal(t, timeutil.Unix(0, td.time), actualArgs[0], + "timestamp query arg doesn't match event") + require.Equal(t, kvserverpb.RangeLogEventType_unsafe_quorum_recovery.String(), actualArgs[3], + "incorrect RangeLog event type") + }) + } +} diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index babf390fc358..4cdd89f8e131 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/syncmap" @@ -1132,6 +1133,20 @@ type delayingHeader struct { DelayMS int32 } +func (rpcCtx *Context) makeDialCtx( + target string, remoteNodeID roachpb.NodeID, class ConnectionClass, +) context.Context { + dialCtx := rpcCtx.masterCtx + var rnodeID interface{} = remoteNodeID + if remoteNodeID == 0 { + rnodeID = '?' + } + dialCtx = logtags.AddTag(dialCtx, "rnode", rnodeID) + dialCtx = logtags.AddTag(dialCtx, "raddr", target) + dialCtx = logtags.AddTag(dialCtx, "class", class) + return dialCtx +} + // GRPCDialRaw calls grpc.Dial with options appropriate for the context. // Unlike GRPCDialNode, it does not start an RPC heartbeat to validate the // connection. This connection will not be reconnected automatically; @@ -1139,11 +1154,15 @@ type delayingHeader struct { // This method implies a DefaultClass ConnectionClass for the returned // ClientConn. func (rpcCtx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) { - return rpcCtx.grpcDialRaw(target, 0, DefaultClass) + ctx := rpcCtx.makeDialCtx(target, 0, DefaultClass) + return rpcCtx.grpcDialRaw(ctx, target, 0, DefaultClass) } +// grpcDialRaw connects to the remote node. +// The ctx passed as argument must be derived from rpcCtx.masterCtx, so +// that it respects the same cancellation policy. func (rpcCtx *Context) grpcDialRaw( - target string, remoteNodeID roachpb.NodeID, class ConnectionClass, + ctx context.Context, target string, remoteNodeID roachpb.NodeID, class ConnectionClass, ) (*grpc.ClientConn, <-chan struct{}, error) { dialOpts, err := rpcCtx.grpcDialOptions(target, class) if err != nil { @@ -1174,7 +1193,7 @@ func (rpcCtx *Context) grpcDialRaw( dialerFunc := dialer.dial if rpcCtx.Knobs.ArtificialLatencyMap != nil { latency := rpcCtx.Knobs.ArtificialLatencyMap[target] - log.VEventf(rpcCtx.masterCtx, 1, "connecting to node %s (%d) with simulated latency %dms", target, remoteNodeID, + log.VEventf(ctx, 1, "connecting with simulated latency %dms", latency) dialer := artificialLatencyDialer{ dialerFunc: dialerFunc, @@ -1189,8 +1208,8 @@ func (rpcCtx *Context) grpcDialRaw( // behavior and redialChan will never be closed). dialOpts = append(dialOpts, rpcCtx.testingDialOpts...) - log.Health.Infof(rpcCtx.masterCtx, "dialing n%v: %s (%v)", remoteNodeID, target, class) - conn, err := grpc.DialContext(rpcCtx.masterCtx, target, dialOpts...) + log.Health.Infof(ctx, "dialing") + conn, err := grpc.DialContext(ctx, target, dialOpts...) if err != nil && rpcCtx.masterCtx.Err() != nil { // If the node is draining, discard the error (which is likely gRPC's version // of context.Canceled) and return errDialRejected which instructs callers not @@ -1205,7 +1224,8 @@ func (rpcCtx *Context) grpcDialRaw( // used with the gossip client and CLI commands which can talk to any // node. This method implies a SystemClass. func (rpcCtx *Context) GRPCUnvalidatedDial(target string) *Connection { - return rpcCtx.grpcDialNodeInternal(target, 0, SystemClass) + ctx := rpcCtx.makeDialCtx(target, 0, SystemClass) + return rpcCtx.grpcDialNodeInternal(ctx, target, 0, SystemClass) } // GRPCDialNode calls grpc.Dial with options appropriate for the @@ -1218,10 +1238,11 @@ func (rpcCtx *Context) GRPCUnvalidatedDial(target string) *Connection { func (rpcCtx *Context) GRPCDialNode( target string, remoteNodeID roachpb.NodeID, class ConnectionClass, ) *Connection { + ctx := rpcCtx.makeDialCtx(target, remoteNodeID, class) if remoteNodeID == 0 && !rpcCtx.TestingAllowNamedRPCToAnonymousServer { - log.Fatalf(context.TODO(), "%v", errors.AssertionFailedf("invalid node ID 0 in GRPCDialNode()")) + log.Fatalf(ctx, "%v", errors.AssertionFailedf("invalid node ID 0 in GRPCDialNode()")) } - return rpcCtx.grpcDialNodeInternal(target, remoteNodeID, class) + return rpcCtx.grpcDialNodeInternal(ctx, target, remoteNodeID, class) } // GRPCDialPod wraps GRPCDialNode and treats the `remoteInstanceID` @@ -1237,8 +1258,11 @@ func (rpcCtx *Context) GRPCDialPod( return rpcCtx.GRPCDialNode(target, roachpb.NodeID(remoteInstanceID), class) } +// grpcDialNodeInternal connects to the remote node and sets up the async heartbeater. +// The ctx passed as argument must be derived from rpcCtx.masterCtx, so +// that it respects the same cancellation policy. func (rpcCtx *Context) grpcDialNodeInternal( - target string, remoteNodeID roachpb.NodeID, class ConnectionClass, + ctx context.Context, target string, remoteNodeID roachpb.NodeID, class ConnectionClass, ) *Connection { thisConnKeys := []connKey{{target, remoteNodeID, class}} value, ok := rpcCtx.conns.Load(thisConnKeys[0]) @@ -1273,14 +1297,15 @@ func (rpcCtx *Context) grpcDialNodeInternal( // Either we kick off the heartbeat loop (and clean up when it's done), // or we clean up the connKey entries immediately. var redialChan <-chan struct{} - conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(target, remoteNodeID, class) + conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(ctx, target, remoteNodeID, class) if conn.dialErr == nil { if err := rpcCtx.Stopper.RunAsyncTask( - rpcCtx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { - err := rpcCtx.runHeartbeat(conn, target, redialChan) + logtags.AddTag(ctx, "heartbeat", nil), + "rpc.Context: grpc heartbeat", func(ctx context.Context) { + err := rpcCtx.runHeartbeat(ctx, conn, target, redialChan) if err != nil && !grpcutil.IsClosedConnection(err) && !grpcutil.IsConnectionRejected(err) { - log.Health.Errorf(masterCtx, "removing connection to %s due to error: %v", target, err) + log.Health.Errorf(ctx, "removing connection to %s due to error: %v", target, err) } rpcCtx.removeConn(conn, thisConnKeys...) }); err != nil { @@ -1316,8 +1341,11 @@ var ErrNotHeartbeated = errors.New("not yet heartbeated") // the node. var ErrNoConnection = errors.New("no connection found") +// runHeartbeat runs the heartbeat loop for the given RPC connection. +// The ctx passed as argument must be derived from rpcCtx.masterCtx, so +// that it respects the same cancellation policy. func (rpcCtx *Context) runHeartbeat( - conn *Connection, target string, redialChan <-chan struct{}, + ctx context.Context, conn *Connection, target string, redialChan <-chan struct{}, ) (retErr error) { rpcCtx.metrics.HeartbeatLoopsStarted.Inc(1) // setInitialHeartbeatDone is idempotent and is critical to notify Connect @@ -1365,7 +1393,7 @@ func (rpcCtx *Context) runHeartbeat( heartbeatTimer.Read = true } - if err := rpcCtx.Stopper.RunTaskWithErr(rpcCtx.masterCtx, "rpc heartbeat", func(goCtx context.Context) error { + if err := rpcCtx.Stopper.RunTaskWithErr(ctx, "rpc heartbeat", func(ctx context.Context) error { // We re-mint the PingRequest to pick up any asynchronous update to clusterID. clusterID := rpcCtx.ClusterID.Get() request := &PingRequest{ @@ -1384,7 +1412,7 @@ func (rpcCtx *Context) runHeartbeat( var response *PingResponse sendTime := rpcCtx.Clock.PhysicalTime() - ping := func(goCtx context.Context) error { + ping := func(ctx context.Context) error { // NB: We want the request to fail-fast (the default), otherwise we won't // be notified of transport failures. if err := interceptor(request); err != nil { @@ -1392,14 +1420,14 @@ func (rpcCtx *Context) runHeartbeat( return err } var err error - response, err = heartbeatClient.Ping(goCtx, request) + response, err = heartbeatClient.Ping(ctx, request) return err } var err error if rpcCtx.heartbeatTimeout > 0 { - err = contextutil.RunWithTimeout(goCtx, "rpc heartbeat", rpcCtx.heartbeatTimeout, ping) + err = contextutil.RunWithTimeout(ctx, "rpc heartbeat", rpcCtx.heartbeatTimeout, ping) } else { - err = ping(goCtx) + err = ping(ctx) } if grpcutil.IsConnectionRejected(err) { @@ -1425,7 +1453,7 @@ func (rpcCtx *Context) runHeartbeat( if err == nil { err = errors.Wrap( - checkVersion(goCtx, rpcCtx.Settings, response.ServerVersion), + checkVersion(ctx, rpcCtx.Settings, response.ServerVersion), "version compatibility check failed on ping response") if err != nil { returnErr = true @@ -1453,7 +1481,7 @@ func (rpcCtx *Context) runHeartbeat( remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2) request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds() } - rpcCtx.RemoteClocks.UpdateOffset(rpcCtx.masterCtx, target, request.Offset, pingDuration) + rpcCtx.RemoteClocks.UpdateOffset(ctx, target, request.Offset, pingDuration) if cb := rpcCtx.HeartbeatCB; cb != nil { cb() diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 1b0a76878aac..0fab06554512 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1889,11 +1889,13 @@ func TestRunHeartbeatSetsHeartbeatStateWhenExitingBeforeFirstHeartbeat(t *testin redialChan := make(chan struct{}) close(redialChan) - c.grpcConn, _, c.dialErr = rpcCtx.grpcDialRaw(remoteAddr, serverNodeID, DefaultClass) + c.grpcConn, _, c.dialErr = rpcCtx.grpcDialRaw(rpcCtx.masterCtx, remoteAddr, serverNodeID, DefaultClass) require.NoError(t, c.dialErr) // It is possible that the redial chan being closed is not seen on the first // pass through the loop. - err = rpcCtx.runHeartbeat(c, "", redialChan) + // NB: we use rpcCtx.masterCtx and not just ctx because we need + // this to be cancelled when the RPC context is closed. + err = rpcCtx.runHeartbeat(rpcCtx.masterCtx, c, "", redialChan) require.EqualError(t, err, grpcutil.ErrCannotReuseClientConn.Error()) // Even when the runHeartbeat returns, we could have heartbeated successfully. // If we did not, then we expect the `not yet heartbeated` error. diff --git a/pkg/server/server.go b/pkg/server/server.go index 4203ed2fa1d7..bd231651fafe 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -82,6 +82,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness" "github.com/cockroachdb/cockroach/pkg/sql/pgwire" _ "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scjob" // register jobs declared outside of pkg/sql + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/ts" @@ -1768,27 +1769,7 @@ func (s *Server) PreStart(ctx context.Context) error { // Once all stores are initialized, check if offline storage recovery // was done prior to start and record any actions appropriately. - if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { - eventCount, err := loqrecovery.RegisterOfflineRecoveryEvents( - ctx, - s.Engine(), - func(ctx context.Context, record loqrecoverypb.ReplicaRecoveryRecord) (bool, error) { - event := record.AsStructuredLog() - log.StructuredEvent(ctx, &event) - s.Metrics().RangeLossOfQuorumRecoveries.Inc(1) - return true, nil - }) - if eventCount > 0 { - log.Infof( - ctx, "registered %d loss of quorum replica recovery events for s%d", - eventCount, s.Ident.StoreID) - } - return err - }); err != nil { - // We don't want to abort server if we can't record recovery events - // as it is the last thing we need if cluster is already unhealthy. - log.Errorf(ctx, "failed to record loss of quorum recovery events: %v", err) - } + logPendingLossOfQuorumRecoveryEvents(ctx, s.node.stores) log.Ops.Infof(ctx, "starting %s server at %s (use: %s)", redact.Safe(s.cfg.HTTPRequestScheme()), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr) @@ -1971,6 +1952,12 @@ func (s *Server) PreStart(ctx context.Context) error { return errors.Wrapf(err, "failed to start KV prober") } + // As final stage of loss of quorum recovery, write events into corresponding + // range logs. We do it as a separate stage to log events early just in case + // startup fails, and write to range log once the server is running as we need + // to run sql statements to update rangelog. + publishPendingLossOfQuorumRecoveryEvents(ctx, s.node.stores, s.stopper) + log.Event(ctx, "server initialized") // Begin recording time series data collected by the status monitor. @@ -1983,6 +1970,71 @@ func (s *Server) PreStart(ctx context.Context) error { return maybeImportTS(ctx, s) } +func logPendingLossOfQuorumRecoveryEvents(ctx context.Context, stores *kvserver.Stores) { + if err := stores.VisitStores(func(s *kvserver.Store) error { + // We are not requesting entry deletion here because we need those entries + // at the end of startup to populate rangelog and possibly other durable + // cluster-replicated destinations. + eventCount, err := loqrecovery.RegisterOfflineRecoveryEvents( + ctx, + s.Engine(), + func(ctx context.Context, record loqrecoverypb.ReplicaRecoveryRecord) (bool, error) { + event := record.AsStructuredLog() + log.StructuredEvent(ctx, &event) + return false, nil + }) + if eventCount > 0 { + log.Infof( + ctx, "registered %d loss of quorum replica recovery events for s%d", + eventCount, s.Ident.StoreID) + } + return err + }); err != nil { + // We don't want to abort server if we can't record recovery events + // as it is the last thing we need if cluster is already unhealthy. + log.Errorf(ctx, "failed to record loss of quorum recovery events: %v", err) + } +} + +func publishPendingLossOfQuorumRecoveryEvents( + ctx context.Context, stores *kvserver.Stores, stopper *stop.Stopper, +) { + _ = stopper.RunAsyncTask(ctx, "publish-loss-of-quorum-events", func(ctx context.Context) { + if err := stores.VisitStores(func(s *kvserver.Store) error { + recoveryEventsSupported := s.ClusterSettings().Version.IsActive(ctx, + clusterversion.UnsafeLossOfQuorumRecoveryRangeLog) + _, err := loqrecovery.RegisterOfflineRecoveryEvents( + ctx, + s.Engine(), + func(ctx context.Context, record loqrecoverypb.ReplicaRecoveryRecord) (bool, error) { + sqlExec := func(ctx context.Context, stmt string, args ...interface{}) (int, error) { + return s.GetStoreConfig().SQLExecutor.ExecEx(ctx, "", nil, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, stmt, args...) + } + if recoveryEventsSupported { + if err := loqrecovery.UpdateRangeLogWithRecovery(ctx, sqlExec, record); err != nil { + return false, errors.Wrap(err, + "loss of quorum recovery failed to write RangeLog entry") + } + } + // We only bump metrics as the last step when all processing of events + // is finished. This is done to ensure that we don't increase metrics + // more than once. + // Note that if actual deletion of event fails, it is possible to + // duplicate rangelog and metrics, but that is very unlikely event + // and user should be able to identify those events. + s.Metrics().RangeLossOfQuorumRecoveries.Inc(1) + return true, nil + }) + return err + }); err != nil { + // We don't want to abort server if we can't record recovery events + // as it is the last thing we need if cluster is already unhealthy. + log.Errorf(ctx, "failed to update range log with loss of quorum recovery events: %v", err) + } + }) +} + // ConfigureGRPCGateway initializes services necessary for running the // GRPC Gateway services proxied against the server at `grpcSrv`. // diff --git a/pkg/ui/workspaces/db-console/src/views/reports/containers/range/logTable.tsx b/pkg/ui/workspaces/db-console/src/views/reports/containers/range/logTable.tsx index ce8b571e06f9..055d320a2ed2 100644 --- a/pkg/ui/workspaces/db-console/src/views/reports/containers/range/logTable.tsx +++ b/pkg/ui/workspaces/db-console/src/views/reports/containers/range/logTable.tsx @@ -41,6 +41,9 @@ function printLogEventType( return "Split"; case protos.cockroach.kv.kvserver.storagepb.RangeLogEventType.merge: return "Merge"; + case protos.cockroach.kv.kvserver.storagepb.RangeLogEventType + .unsafe_quorum_recovery: + return "Unsafe Quorum Recovery"; default: return "Unknown"; }