Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
114861: server: remove admin checks from server-v2 r=rafiss a=rafiss

These are now replaced by checks for the VIEWCLUSTERMETADATA privilege. The equivalent change was already made on the v1 server.

The following endpoints are affected (these endpoints are not used in the frontend, so this change does not have any user-facing impact):
- /sessions/
- /nodes/
- /nodes/{node_id}/ranges/
- /ranges/{range_id:[0-9]+}/
- /events/

informs #114384
informs #79571
informs #109814
Release note: None

115000: streamingccl: only return lag replanning error if lagging node has not advanced r=stevendanna a=msbutler

Previously, the frontier processor would return a lag replanning error if it detected a lagging node and after the hwm had advanced during the flow. This implies the frontier processor could replan as soon as a lagging node finished its catchup scan and bumped the hwm, but was still far behind the other nodes, as we observed in #114706. Ideally, the frontier processor should not throw this replanning error because the lagging node is making progress and because replanning can cause repeated work.

This patch prevents this scenario by teaching the frontier processor to only throw a replanning error if:
- the hwm has advanced in the flow
- two consecutive lagging node checks detected a lagging node and the hwm has not advanced during those two checks.

Informs #114706

Release note: none

115046: catalog/lease: fix flake for TestTableCreationPushesTxnsInRecentPast r=fqazi a=fqazi

Previously, TestTableCreationPushesTxnsInRecentPast could flake because we attempted to increase odds of hitting the uncertainty interval error by adding a delay on KV RPC calls. This wasn't effective and could still intermittent failures, and instead we are going to directly modify the uncertainty interval by setting a large MaxOffset on the clock. This will cause the desired behaviour in a more deterministic way.

Fixes: #114366

Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
4 people committed Nov 27, 2023
4 parents 078c69a + 47e22e1 + 9be6d5a + 1fb0eac commit b464c1c
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 193 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ functions along with the number of arguments and the return type of the overload
crdb_internal.get_namespace_id | 2 | int8
crdb_internal.get_namespace_id | 3 | int8
crdb_internal.get_zone_config | 1 | bytea
crdb_internal.is_admin | 0 | bool
crdb_internal.locality_value | 1 | text
crdb_internal.node_id | 0 | int8
crdb_internal.num_geo_inverted_index_entries | 3 | int8
Expand Down
17 changes: 0 additions & 17 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -573,23 +573,6 @@ SELECT
FROM
table41834;


subtest builtin_is_admin

user root

query B
SELECT crdb_internal.is_admin()
----
true

user testuser

query B
SELECT crdb_internal.is_admin()
----
false

user root

# Test the crdb_internal.create_type_statements table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type streamIngestionFrontier struct {
partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress

lastNodeLagCheck time.Time

// replicatedTimeAtLastPositiveLagNodeCheck records the replicated time the
// last time the lagging node checker detected a lagging node.
replicatedTimeAtLastPositiveLagNodeCheck hlc.Timestamp
}

var _ execinfra.Processor = &streamIngestionFrontier{}
Expand Down Expand Up @@ -553,9 +557,29 @@ func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error {
}()
executionDetails := constructSpanFrontierExecutionDetailsWithFrontier(sf.spec.PartitionSpecs, sf.frontier)
log.VEvent(ctx, 2, "checking for lagging nodes")
return checkLaggingNodes(
err := checkLaggingNodes(
ctx,
executionDetails,
maxLag,
)
return sf.handleLaggingNodeError(ctx, err)
}

func (sf *streamIngestionFrontier) handleLaggingNodeError(ctx context.Context, err error) error {
switch {
case err == nil:
sf.replicatedTimeAtLastPositiveLagNodeCheck = hlc.Timestamp{}
log.VEvent(ctx, 2, "no lagging nodes after check")
return nil
case !errors.Is(err, ErrNodeLagging):
return err
case sf.replicatedTimeAtLastPositiveLagNodeCheck.Less(sf.persistedReplicatedTime):
sf.replicatedTimeAtLastPositiveLagNodeCheck = sf.persistedReplicatedTime
log.Infof(ctx, "detected a lagging node: %s. Don't forward error because replicated time at last check %s is less than current replicated time %s", err, sf.replicatedTimeAtLastPositiveLagNodeCheck, sf.persistedReplicatedTime)
return nil
case sf.replicatedTimeAtLastPositiveLagNodeCheck.Equal(sf.persistedReplicatedTime):
return errors.Wrapf(err, "hwm has not advanced from %s", sf.persistedReplicatedTime)
default:
return errors.Wrapf(err, "unable to handle replanning error with replicated time %s and last node lag check replicated time %s", sf.persistedReplicatedTime, sf.replicatedTimeAtLastPositiveLagNodeCheck)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,78 @@ func TestHeartbeatLoop(t *testing.T) {
}
}
}

func TestLaggingNodeErrorHandler(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

type testCase struct {
name string
replicatedTime int64
previousReplicatedTimeOnLag int64
inputLagErr error

expectedNewReplicatedTimeOnLag int64
expectedErrMsg string
}

for _, tc := range []testCase{
{
name: "no more lag",
previousReplicatedTimeOnLag: 1,
expectedNewReplicatedTimeOnLag: 0,
},
{
name: "new lag",
previousReplicatedTimeOnLag: 0,
replicatedTime: 1,
expectedNewReplicatedTimeOnLag: 1,
inputLagErr: ErrNodeLagging,
},
{
name: "repeated lag, no hwm advance",
previousReplicatedTimeOnLag: 1,
replicatedTime: 1,
expectedNewReplicatedTimeOnLag: 1,
inputLagErr: ErrNodeLagging,
expectedErrMsg: ErrNodeLagging.Error(),
},
{
name: "repeated lag, with hwm advance",
previousReplicatedTimeOnLag: 1,
replicatedTime: 2,
expectedNewReplicatedTimeOnLag: 2,
inputLagErr: ErrNodeLagging,
},
{
name: "non lag error",
inputLagErr: errors.New("unexpected"),
expectedErrMsg: "unexpected",
},
{
name: "unhandlable lag error",
previousReplicatedTimeOnLag: 2,
replicatedTime: 1,
expectedNewReplicatedTimeOnLag: 2,
inputLagErr: ErrNodeLagging,
expectedErrMsg: "unable to handle replanning",
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
sf := streamIngestionFrontier{
persistedReplicatedTime: hlc.Timestamp{WallTime: tc.replicatedTime},
replicatedTimeAtLastPositiveLagNodeCheck: hlc.Timestamp{WallTime: tc.previousReplicatedTimeOnLag},
}
err := sf.handleLaggingNodeError(ctx, tc.inputLagErr)
if tc.expectedErrMsg == "" {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, tc.expectedErrMsg)
}
require.Equal(t, hlc.Timestamp{WallTime: tc.expectedNewReplicatedTimeOnLag}, sf.replicatedTimeAtLastPositiveLagNodeCheck)
})
}
}
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@ func destClusterSettings(t test.Test, db *sqlutils.SQLRunner, additionalDuration
db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`,
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
`SET CLUSTER SETTING stream_replication.replan_flow_threshold = 0.1;`,
`SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '10m';`)
`SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '5m';`)

if additionalDuration != 0 {
replanFrequency := additionalDuration / 2
Expand Down
31 changes: 24 additions & 7 deletions pkg/server/api_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ import (
"strconv"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/apiconstants"
"github.com/cockroachdb/cockroach/pkg/server/apiutil"
"github.com/cockroachdb/cockroach/pkg/server/authserver"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -169,16 +171,16 @@ func registerRoutes(
{"logout/", a.authServer.ServeHTTP, false /* requiresAuth */, authserver.RegularRole, false},

// Directly register other endpoints in the api server.
{"sessions/", a.listSessions, true /* requiresAuth */, authserver.AdminRole, false},
{"nodes/", systemRoutes.listNodes, true, authserver.AdminRole, false},
{"sessions/", a.listSessions, true /* requiresAuth */, authserver.ViewClusterMetadataRole, false},
{"nodes/", systemRoutes.listNodes, true, authserver.ViewClusterMetadataRole, false},
// Any endpoint returning range information requires an admin user. This is because range start/end keys
// are sensitive info.
{"nodes/{node_id}/ranges/", systemRoutes.listNodeRanges, true, authserver.AdminRole, false},
{"ranges/hot/", a.listHotRanges, true, authserver.AdminRole, false},
{"ranges/{range_id:[0-9]+}/", a.listRange, true, authserver.AdminRole, false},
{"nodes/{node_id}/ranges/", systemRoutes.listNodeRanges, true, authserver.ViewClusterMetadataRole, false},
{"ranges/hot/", a.listHotRanges, true, authserver.ViewClusterMetadataRole, false},
{"ranges/{range_id:[0-9]+}/", a.listRange, true, authserver.ViewClusterMetadataRole, false},
{"health/", systemRoutes.health, false, authserver.RegularRole, false},
{"users/", a.listUsers, true, authserver.RegularRole, false},
{"events/", a.listEvents, true, authserver.AdminRole, false},
{"events/", a.listEvents, true, authserver.ViewClusterMetadataRole, false},
{"databases/", a.listDatabases, true, authserver.RegularRole, false},
{"databases/{database_name:[\\w.]+}/", a.databaseDetails, true, authserver.RegularRole, false},
{"databases/{database_name:[\\w.]+}/grants/", a.databaseGrants, true, authserver.RegularRole, false},
Expand All @@ -203,10 +205,25 @@ func registerRoutes(
http.Error(w, "Not Available on Tenants", http.StatusNotImplemented)
}))
}

// Tell the authz server how to connect to SQL.
authzAccessorFactory := func(ctx context.Context, opName string) (sql.AuthorizationAccessor, func()) {
txn := a.db.NewTxn(ctx, opName)
p, cleanup := sql.NewInternalPlanner(
opName,
txn,
username.RootUserName(),
&sql.MemoryMetrics{},
a.sqlServer.execCfg,
sql.NewInternalSessionData(ctx, a.sqlServer.execCfg.Settings, opName),
)
return p.(sql.AuthorizationAccessor), cleanup
}

if route.requiresAuth {
a.mux.Handle(apiconstants.APIV2Path+route.url, authMux)
if route.role != authserver.RegularRole {
handler = authserver.NewRoleAuthzMux(a.sqlServer.internalExecutor, route.role, handler)
handler = authserver.NewRoleAuthzMux(authzAccessorFactory, route.role, handler)
}
innerMux.Handle(apiconstants.APIV2Path+route.url, handler)
} else {
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/authserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ go_library(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/isql",
"//pkg/sql/privilege",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
"//pkg/ui",
"//pkg/util/grpcutil",
Expand Down
11 changes: 6 additions & 5 deletions pkg/server/authserver/api_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"net/http"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
)

type ServerV2 interface {
Expand Down Expand Up @@ -61,10 +60,12 @@ func NewV2Mux(s ServerV2, inner http.Handler, allowAnonymous bool) AuthV2Mux {
}

// NewRoleAuthzMux creates a new RoleAuthzMux.
func NewRoleAuthzMux(ie isql.Executor, role APIRole, inner http.Handler) RoleAuthzMux {
func NewRoleAuthzMux(
authzAccessorFactory authzAccessorFactory, role APIRole, inner http.Handler,
) RoleAuthzMux {
return &roleAuthorizationMux{
ie: ie,
role: role,
inner: inner,
authzAccessorFactory: authzAccessorFactory,
role: role,
inner: inner,
}
}
51 changes: 21 additions & 30 deletions pkg/server/authserver/api_v2_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/apiutil"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -344,52 +345,42 @@ type APIRole int
const (
// RegularRole is the default role for an APIv2 endpoint.
RegularRole APIRole = iota
// AdminRole is the role for an APIv2 endpoint that requires
// admin privileges.
AdminRole
// SuperUserRole is the role for an APIv2 endpoint that requires
// superuser privileges.
SuperUserRole
// ViewClusterMetadataRole is the role for an APIv2 endpoint that requires
// VIEWCLUSTERMETADATA privileges.
ViewClusterMetadataRole
)

type authzAccessorFactory func(ctx context.Context, opName string) (_ sql.AuthorizationAccessor, cleanup func())

// roleAuthorizationMux enforces a role (eg. type of user) for an arbitrary
// inner mux. Meant to be used under authenticationV2Mux. If the logged-in user
// is not at least of `role` type, an HTTP 403 forbidden error is returned.
// Otherwise, the request is passed onto the inner http.Handler.
type roleAuthorizationMux struct {
ie isql.Executor
role APIRole
inner http.Handler
authzAccessorFactory authzAccessorFactory
role APIRole
inner http.Handler
}

func (r *roleAuthorizationMux) getRoleForUser(
ctx context.Context, user username.SQLUsername,
) (APIRole, error) {
if user.IsRootUser() {
// Shortcut.
return SuperUserRole, nil
return ViewClusterMetadataRole, nil
}
row, err := r.ie.QueryRowEx(
ctx, "check-is-admin", nil, /* txn */
sessiondata.InternalExecutorOverride{User: user},
"SELECT crdb_internal.is_admin()")

authzAccessor, cleanup := r.authzAccessorFactory(ctx, "check-privilege")
defer cleanup()

hasPriv, err := authzAccessor.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.VIEWCLUSTERMETADATA, user)
if err != nil {
return RegularRole, err
} else if hasPriv {
return ViewClusterMetadataRole, nil
} else {
return RegularRole, nil
}
if row == nil {
return RegularRole, errors.AssertionFailedf("hasAdminRole: expected 1 row, got 0")
}
if len(row) != 1 {
return RegularRole, errors.AssertionFailedf("hasAdminRole: expected 1 column, got %d", len(row))
}
dbDatum, ok := tree.AsDBool(row[0])
if !ok {
return RegularRole, errors.AssertionFailedf("hasAdminRole: expected bool, got %T", row[0])
}
if dbDatum {
return AdminRole, nil
}
return RegularRole, nil
}

func (r *roleAuthorizationMux) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/server/privchecker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ go_library(
"//pkg/sql/isql",
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/syntheticprivilege",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
Expand Down
Loading

0 comments on commit b464c1c

Please sign in to comment.