Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Support AuthorizedView in data and admin client #9515

Merged
merged 10 commits into from
Apr 29, 2024
256 changes: 256 additions & 0 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (ac *AdminClient) backupPath(cluster, instance, backup string) string {
return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, instance, cluster, backup)
}

func (ac *AdminClient) authorizedViewPath(table, authorizedView string) string {
return fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), table, authorizedView)
}

// EncryptionInfo represents the encryption info of a table.
type EncryptionInfo struct {
Status *Status
Expand Down Expand Up @@ -881,6 +885,11 @@ func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, ac.instance, backup))
}

// AuthorizedViewIAM creates an IAM Handle specific to a given Table and AuthorizedView.
func (ac *AdminClient) AuthorizedViewIAM(table, authorizedView string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.authorizedViewPath(table, authorizedView))
}

const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443"

Expand Down Expand Up @@ -2175,3 +2184,250 @@ func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string,
_, err := ac.tClient.UpdateBackup(ctx, req)
return err
}

// AuthorizedViewConf contains information about an authorized view.
type AuthorizedViewConf struct {
TableID string
AuthorizedViewID string

// Types that are valid to be assigned to AuthorizedView:
// *SubsetViewConf
AuthorizedView isAuthorizedView
DeletionProtection DeletionProtection
}

// A private interface that currently only implemented by SubsetViewConf, ensuring that only SubsetViewConf instances are accepted as an AuthorizedView.
// In the future if a new type of AuthorizedView is introduced, it should also implements this interface.
type isAuthorizedView interface {
isAuthorizedView()
}

func (*SubsetViewConf) isAuthorizedView() {}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

// SubsetViewConf contains configuration specific to an authorized view of subset view type.
type SubsetViewConf struct {
RowPrefixes [][]byte
FamilySubsets map[string]FamilySubset
}

func (av AuthorizedViewConf) proto() *btapb.AuthorizedView {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
var avp btapb.AuthorizedView

switch dp := av.DeletionProtection; dp {
case Protected:
avp.DeletionProtection = true
case Unprotected:
avp.DeletionProtection = false
default:
break
}

switch avt := av.AuthorizedView.(type) {
case *SubsetViewConf:
avp.AuthorizedView = &btapb.AuthorizedView_SubsetView_{
SubsetView: avt.proto(),
}
default:
break
}
return &avp
}

// GetSubsetView returns nil if the type of the AuthorizedView is not SubsetViewConf, otherwise it returns the pointer to the SubsetViewConf instance.
func (av *AuthorizedViewConf) GetSubsetView() *SubsetViewConf {
if sv, ok := av.AuthorizedView.(*SubsetViewConf); ok {
return sv
}
return nil
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

// FamilySubset represents a subset of a column family.
type FamilySubset struct {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
Qualifiers [][]byte
QualifierPrefixes [][]byte
}

// AddRowPrefix adds a new row prefix to the subset view.
func (s *SubsetViewConf) AddRowPrefix(prefix []byte) {
s.RowPrefixes = append(s.RowPrefixes, prefix)
}

func (s *SubsetViewConf) getFamilySubset(familyName string) FamilySubset {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
if s.FamilySubsets == nil {
s.FamilySubsets = make(map[string]FamilySubset)
}
if _, ok := s.FamilySubsets[familyName]; !ok {
s.FamilySubsets[familyName] = FamilySubset{}
}
return s.FamilySubsets[familyName]
}

func (s SubsetViewConf) proto() *btapb.AuthorizedView_SubsetView {
var p btapb.AuthorizedView_SubsetView
p.RowPrefixes = append(p.RowPrefixes, s.RowPrefixes...)
if p.FamilySubsets == nil {
p.FamilySubsets = make(map[string]*btapb.AuthorizedView_FamilySubsets)
}
for familyName, subset := range s.FamilySubsets {
p.FamilySubsets[familyName] = &btapb.AuthorizedView_FamilySubsets{
Qualifiers: subset.Qualifiers,
QualifierPrefixes: subset.QualifierPrefixes,
}
}
return &p
}

func (s *SubsetViewConf) fillConf(internal *btapb.AuthorizedView_SubsetView) {
s.RowPrefixes = [][]byte{}
s.RowPrefixes = append(s.RowPrefixes, internal.RowPrefixes...)
if s.FamilySubsets == nil {
s.FamilySubsets = make(map[string]FamilySubset)
}
for k, v := range internal.FamilySubsets {
s.FamilySubsets[k] = FamilySubset{
Qualifiers: v.Qualifiers,
QualifierPrefixes: v.QualifierPrefixes,
}
}
}

// AddFamilySubsetQualifier adds an individual column qualifier to be included in a subset view.
func (s *SubsetViewConf) AddFamilySubsetQualifier(familyName string, qualifier []byte) {
fs := s.getFamilySubset(familyName)
fs.Qualifiers = append(fs.Qualifiers, qualifier)
s.FamilySubsets[familyName] = fs
}

// AddFamilySubsetQualifierPrefix adds a prefix for column qualifiers to be included in a subset view.
func (s *SubsetViewConf) AddFamilySubsetQualifierPrefix(familyName string, qualifierPrefix []byte) {
fs := s.getFamilySubset(familyName)
fs.QualifierPrefixes = append(fs.QualifierPrefixes, qualifierPrefix)
s.FamilySubsets[familyName] = fs
}

// CreateAuthorizedView creates a new authorized view in a table.
func (ac *AdminClient) CreateAuthorizedView(ctx context.Context, conf *AuthorizedViewConf) error {
if conf.TableID == "" || conf.AuthorizedViewID == "" {
return errors.New("both AuthorizedViewID and TableID are required")
}
if conf.GetSubsetView() == nil {
return errors.New("SubsetView must be specified in AuthorizedViewConf")
}

ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.CreateAuthorizedViewRequest{
Parent: fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), conf.TableID),
AuthorizedViewId: conf.AuthorizedViewID,
AuthorizedView: conf.proto(),
}
_, err := ac.tClient.CreateAuthorizedView(ctx, req)
return err
}

// GetAuthorizedView retrieves information about an authorized view.
func (ac *AdminClient) GetAuthorizedView(ctx context.Context, tableID, authorizedViewID string) (*AuthorizedViewConf, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.GetAuthorizedViewRequest{
Name: fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), tableID, authorizedViewID),
}
var res *btapb.AuthorizedView

err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = ac.tClient.GetAuthorizedView(ctx, req)
return err
}, retryOptions...)

if err != nil {
return nil, err
}

av := &AuthorizedViewConf{TableID: tableID, AuthorizedViewID: authorizedViewID}
if res.DeletionProtection {
av.DeletionProtection = Protected
} else {
av.DeletionProtection = Unprotected
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
if res.GetSubsetView() != nil {
s := SubsetViewConf{}
s.fillConf(res.GetSubsetView())
av.AuthorizedView = &s
}
return av, nil
}

// AuthorizedViews returns a list of the authorized views in the table.
func (ac *AdminClient) AuthorizedViews(ctx context.Context, tableID string) ([]string, error) {
names := []string{}
prefix := fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), tableID)

req := &btapb.ListAuthorizedViewsRequest{
Parent: prefix,
View: btapb.AuthorizedView_NAME_ONLY,
}
var res *btapb.ListAuthorizedViewsResponse
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = ac.tClient.ListAuthorizedViews(ctx, req)
return err
}, retryOptions...)
if err != nil {
return nil, err
}

for _, av := range res.AuthorizedViews {
names = append(names, strings.TrimPrefix(av.Name, prefix+"/authorizedViews/"))
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
return names, nil
}

// UpdateAuthorizedViewConf contains all the information necessary to update or partial update an authorized view.
type UpdateAuthorizedViewConf struct {
AuthorizedViewConf AuthorizedViewConf
IgnoreWarnings bool
}

// UpdateAuthorizedView updates an authorized view in a table according to the given configuration.
func (ac *AdminClient) UpdateAuthorizedView(ctx context.Context, conf UpdateAuthorizedViewConf) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
if conf.AuthorizedViewConf.TableID == "" || conf.AuthorizedViewConf.AuthorizedViewID == "" {
return errors.New("both AuthorizedViewID and TableID is required")
}
av := conf.AuthorizedViewConf.proto()
av.Name = ac.authorizedViewPath(conf.AuthorizedViewConf.TableID, conf.AuthorizedViewConf.AuthorizedViewID)

updateMask := &field_mask.FieldMask{
Paths: []string{},
}
if conf.AuthorizedViewConf.DeletionProtection != None {
updateMask.Paths = append(updateMask.Paths, "deletion_protection")
}
if sv := conf.AuthorizedViewConf.GetSubsetView(); sv != nil {
updateMask.Paths = append(updateMask.Paths, "subset_view")
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
req := &btapb.UpdateAuthorizedViewRequest{
AuthorizedView: av,
UpdateMask: updateMask,
IgnoreWarnings: conf.IgnoreWarnings,
}
lro, err := ac.tClient.UpdateAuthorizedView(ctx, req)
if err != nil {
return fmt.Errorf("error from update authorized view: %w", err)
}
var res btapb.AuthorizedView
op := longrunning.InternalNewOperation(ac.lroClient, lro)
if err = op.Wait(ctx, &res); err != nil {
return fmt.Errorf("error from operation: %v", err)
}
return nil
}

// DeleteAuthorizedView deletes an authorized view in a table.
func (ac *AdminClient) DeleteAuthorizedView(ctx context.Context, tableID, authorizedViewID string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.DeleteAuthorizedViewRequest{
Name: ac.authorizedViewPath(tableID, authorizedViewID),
}
_, err := ac.tClient.DeleteAuthorizedView(ctx, req)
return err
}
Loading
Loading