-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
authorizer.go
77 lines (68 loc) · 2.25 KB
/
authorizer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package tenantcapabilitiesauthorizer
import (
"context"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
// Authorizer is a concrete implementation of the tenantcapabilities.Authorizer
// interface. It's safe for concurrent use.
type Authorizer struct {
capabilitiesReader tenantcapabilities.Reader
}
var _ tenantcapabilities.Authorizer = &Authorizer{}
// New constructs a new tenantcapabilities.Authorizer.
func New() *Authorizer {
a := &Authorizer{
// capabilitiesReader is set post construction, using BindReader.
}
return a
}
// HasCapabilityForBatch implements the tenantcapabilities.Authorizer interface.
func (a *Authorizer) HasCapabilityForBatch(
ctx context.Context, tenID roachpb.TenantID, ba *roachpb.BatchRequest,
) error {
if tenID.IsSystem() {
return nil // the system tenant is allowed to do as it pleases
}
if a.capabilitiesReader == nil {
log.Fatal(ctx, "trying to perform capability check when no Reader exists")
}
cp, found := a.capabilitiesReader.GetCapabilities(tenID)
if !found {
log.VInfof(
ctx,
3,
"no capability information for tenant %s; requests that require capabilities may be denied",
tenID,
)
}
for _, ru := range ba.Requests {
switch ru.GetInner().(type) {
case *roachpb.AdminSplitRequest:
if !cp.CanAdminSplit {
return errors.Newf("tenant %s does not have admin split capability", tenID)
}
default:
// No capability checks for other types of requests.
}
}
return nil
}
// BindReader implements the tenantcapabilities.Authorizer interface.
func (a *Authorizer) BindReader(ctx context.Context, reader tenantcapabilities.Reader) {
if a.capabilitiesReader != nil {
log.Fatal(ctx, "cannot bind a tenant capabilities reader more than once")
}
a.capabilitiesReader = reader
}