Skip to content

Commit

Permalink
feat(bigtable): Support AuthorizedView in data and admin client (#9515)
Browse files Browse the repository at this point in the history
* feat(bigtable): Support AuthorizedView in data and admin client

Change-Id: I1c04c6abb7a0ecaaa5a95334b9e6934f638b5afb

* fix: Address review comments

Change-Id: Id22fe08bc0a830cab83f30d892f6d78c269d2895

* feat: Add AuthorizedView IAM

Change-Id: I9781e0882f1b152228edb276f12284d5ccf64315

* fix: fix request headers for AuthorizedViews

Change-Id: I5dc6c0c79067903fe5c71c0f9d51e40d3f5233ca

* fix: resolve vet failures

Change-Id: Ie1e82d3a8389d26217f0fc10ffa9ca16f3b72b6d

* Expose FamilySubset struct and a small fix

Change-Id: Ia07847f4c43197d8498ead0e225651fe2b4713d7

* Address review comments

Change-Id: I99f87a46cd96d96710aac4532a61361dcc7e2a27

* Let Get method return Info struct and reordering

Change-Id: I974abdf400050d65b76a2fed34685e20d2e7781f

* fix: Remove GetSubsetView() and reordering

Change-Id: I28880079e167d37cad1fc6740c089e51be10f347

---------

Co-authored-by: trollyxia <[email protected]>
  • Loading branch information
trollyxia and trollyxia authored Apr 29, 2024
1 parent 31558c7 commit 8259645
Show file tree
Hide file tree
Showing 4 changed files with 910 additions and 7 deletions.
269 changes: 269 additions & 0 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (ac *AdminClient) backupPath(cluster, instance, backup string) string {
return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, instance, cluster, backup)
}

func (ac *AdminClient) authorizedViewPath(table, authorizedView string) string {
return fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), table, authorizedView)
}

// EncryptionInfo represents the encryption info of a table.
type EncryptionInfo struct {
Status *Status
Expand Down Expand Up @@ -881,6 +885,11 @@ func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, ac.instance, backup))
}

// AuthorizedViewIAM creates an IAM Handle specific to a given Table and AuthorizedView.
func (ac *AdminClient) AuthorizedViewIAM(table, authorizedView string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.authorizedViewPath(table, authorizedView))
}

const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443"

Expand Down Expand Up @@ -2175,3 +2184,263 @@ func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string,
_, err := ac.tClient.UpdateBackup(ctx, req)
return err
}

// AuthorizedViewConf contains information about an authorized view.
type AuthorizedViewConf struct {
TableID string
AuthorizedViewID string

// Types that are valid to be assigned to AuthorizedView:
// *SubsetViewConf
AuthorizedView isAuthorizedView
DeletionProtection DeletionProtection
}

// A private interface that currently only implemented by SubsetViewConf, ensuring that only SubsetViewConf instances are accepted as an AuthorizedView.
// In the future if a new type of AuthorizedView is introduced, it should also implements this interface.
type isAuthorizedView interface {
isAuthorizedView()
}

func (av AuthorizedViewConf) proto() *btapb.AuthorizedView {
var avp btapb.AuthorizedView

switch dp := av.DeletionProtection; dp {
case Protected:
avp.DeletionProtection = true
case Unprotected:
avp.DeletionProtection = false
default:
break
}

switch avt := av.AuthorizedView.(type) {
case *SubsetViewConf:
avp.AuthorizedView = &btapb.AuthorizedView_SubsetView_{
SubsetView: avt.proto(),
}
default:
break
}
return &avp
}

// FamilySubset represents a subset of a column family.
type FamilySubset struct {
Qualifiers [][]byte
QualifierPrefixes [][]byte
}

// SubsetViewConf contains configuration specific to an authorized view of subset view type.
type SubsetViewConf struct {
RowPrefixes [][]byte
FamilySubsets map[string]FamilySubset
}

func (*SubsetViewConf) isAuthorizedView() {}

// AddRowPrefix adds a new row prefix to the subset view.
func (s *SubsetViewConf) AddRowPrefix(prefix []byte) {
s.RowPrefixes = append(s.RowPrefixes, prefix)
}

func (s *SubsetViewConf) getOrCreateFamilySubset(familyName string) FamilySubset {
if s.FamilySubsets == nil {
s.FamilySubsets = make(map[string]FamilySubset)
}
if _, ok := s.FamilySubsets[familyName]; !ok {
s.FamilySubsets[familyName] = FamilySubset{}
}
return s.FamilySubsets[familyName]
}

func (s SubsetViewConf) proto() *btapb.AuthorizedView_SubsetView {
var p btapb.AuthorizedView_SubsetView
p.RowPrefixes = append(p.RowPrefixes, s.RowPrefixes...)
if p.FamilySubsets == nil {
p.FamilySubsets = make(map[string]*btapb.AuthorizedView_FamilySubsets)
}
for familyName, subset := range s.FamilySubsets {
p.FamilySubsets[familyName] = &btapb.AuthorizedView_FamilySubsets{
Qualifiers: subset.Qualifiers,
QualifierPrefixes: subset.QualifierPrefixes,
}
}
return &p
}

// AddFamilySubsetQualifier adds an individual column qualifier to be included in a subset view.
func (s *SubsetViewConf) AddFamilySubsetQualifier(familyName string, qualifier []byte) {
fs := s.getOrCreateFamilySubset(familyName)
fs.Qualifiers = append(fs.Qualifiers, qualifier)
s.FamilySubsets[familyName] = fs
}

// AddFamilySubsetQualifierPrefix adds a prefix for column qualifiers to be included in a subset view.
func (s *SubsetViewConf) AddFamilySubsetQualifierPrefix(familyName string, qualifierPrefix []byte) {
fs := s.getOrCreateFamilySubset(familyName)
fs.QualifierPrefixes = append(fs.QualifierPrefixes, qualifierPrefix)
s.FamilySubsets[familyName] = fs
}

// CreateAuthorizedView creates a new authorized view in a table.
func (ac *AdminClient) CreateAuthorizedView(ctx context.Context, conf *AuthorizedViewConf) error {
if conf.TableID == "" || conf.AuthorizedViewID == "" {
return errors.New("both AuthorizedViewID and TableID are required")
}
if _, ok := conf.AuthorizedView.(*SubsetViewConf); !ok {
return errors.New("SubsetView must be specified in AuthorizedViewConf")
}

ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.CreateAuthorizedViewRequest{
Parent: fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), conf.TableID),
AuthorizedViewId: conf.AuthorizedViewID,
AuthorizedView: conf.proto(),
}
_, err := ac.tClient.CreateAuthorizedView(ctx, req)
return err
}

// AuthorizedViewInfo contains authorized view metadata. This struct is read-only.
type AuthorizedViewInfo struct {
TableID string
AuthorizedViewID string

AuthorizedView isAuthorizedViewInfo
DeletionProtection DeletionProtection
}

type isAuthorizedViewInfo interface {
isAuthorizedViewInfo()
}

// SubsetViewInfo contains read-only SubsetView metadata.
type SubsetViewInfo struct {
RowPrefixes [][]byte
FamilySubsets map[string]FamilySubset
}

func (*SubsetViewInfo) isAuthorizedViewInfo() {}

func (s *SubsetViewInfo) fillInfo(internal *btapb.AuthorizedView_SubsetView) {
s.RowPrefixes = [][]byte{}
s.RowPrefixes = append(s.RowPrefixes, internal.RowPrefixes...)
if s.FamilySubsets == nil {
s.FamilySubsets = make(map[string]FamilySubset)
}
for k, v := range internal.FamilySubsets {
s.FamilySubsets[k] = FamilySubset{
Qualifiers: v.Qualifiers,
QualifierPrefixes: v.QualifierPrefixes,
}
}
}

// AuthorizedViewInfo retrieves information about an authorized view.
func (ac *AdminClient) AuthorizedViewInfo(ctx context.Context, tableID, authorizedViewID string) (*AuthorizedViewInfo, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.GetAuthorizedViewRequest{
Name: fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), tableID, authorizedViewID),
}
var res *btapb.AuthorizedView

err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = ac.tClient.GetAuthorizedView(ctx, req)
return err
}, retryOptions...)

if err != nil {
return nil, err
}

av := &AuthorizedViewInfo{TableID: tableID, AuthorizedViewID: authorizedViewID}
if res.DeletionProtection {
av.DeletionProtection = Protected
} else {
av.DeletionProtection = Unprotected
}
if res.GetSubsetView() != nil {
s := SubsetViewInfo{}
s.fillInfo(res.GetSubsetView())
av.AuthorizedView = &s
}
return av, nil
}

// AuthorizedViews returns a list of the authorized views in the table.
func (ac *AdminClient) AuthorizedViews(ctx context.Context, tableID string) ([]string, error) {
names := []string{}
prefix := fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), tableID)

req := &btapb.ListAuthorizedViewsRequest{
Parent: prefix,
View: btapb.AuthorizedView_NAME_ONLY,
}
var res *btapb.ListAuthorizedViewsResponse
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = ac.tClient.ListAuthorizedViews(ctx, req)
return err
}, retryOptions...)
if err != nil {
return nil, err
}

for _, av := range res.AuthorizedViews {
names = append(names, strings.TrimPrefix(av.Name, prefix+"/authorizedViews/"))
}
return names, nil
}

// UpdateAuthorizedViewConf contains all the information necessary to update or partial update an authorized view.
type UpdateAuthorizedViewConf struct {
AuthorizedViewConf AuthorizedViewConf
IgnoreWarnings bool
}

// UpdateAuthorizedView updates an authorized view in a table according to the given configuration.
func (ac *AdminClient) UpdateAuthorizedView(ctx context.Context, conf UpdateAuthorizedViewConf) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
if conf.AuthorizedViewConf.TableID == "" || conf.AuthorizedViewConf.AuthorizedViewID == "" {
return errors.New("both AuthorizedViewID and TableID is required")
}
av := conf.AuthorizedViewConf.proto()
av.Name = ac.authorizedViewPath(conf.AuthorizedViewConf.TableID, conf.AuthorizedViewConf.AuthorizedViewID)

updateMask := &field_mask.FieldMask{
Paths: []string{},
}
if conf.AuthorizedViewConf.DeletionProtection != None {
updateMask.Paths = append(updateMask.Paths, "deletion_protection")
}
if _, ok := conf.AuthorizedViewConf.AuthorizedView.(*SubsetViewConf); ok {
updateMask.Paths = append(updateMask.Paths, "subset_view")
}
req := &btapb.UpdateAuthorizedViewRequest{
AuthorizedView: av,
UpdateMask: updateMask,
IgnoreWarnings: conf.IgnoreWarnings,
}
lro, err := ac.tClient.UpdateAuthorizedView(ctx, req)
if err != nil {
return fmt.Errorf("error from update authorized view: %w", err)
}
var res btapb.AuthorizedView
op := longrunning.InternalNewOperation(ac.lroClient, lro)
if err = op.Wait(ctx, &res); err != nil {
return fmt.Errorf("error from operation: %v", err)
}
return nil
}

// DeleteAuthorizedView deletes an authorized view in a table.
func (ac *AdminClient) DeleteAuthorizedView(ctx context.Context, tableID, authorizedViewID string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.DeleteAuthorizedViewRequest{
Name: ac.authorizedViewPath(tableID, authorizedViewID),
}
_, err := ac.tClient.DeleteAuthorizedView(ctx, req)
return err
}
Loading

0 comments on commit 8259645

Please sign in to comment.