Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add DataGovernanceType to routines #8990

Merged
merged 5 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions bigquery/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ type RoutineMetadata struct {
// For JAVASCRIPT function, it is the evaluated string in the AS clause of
// a CREATE FUNCTION statement.
Body string

// For data governance use cases. If set to "DATA_MASKING", the function
// is validated and made available as a masking function. For more information,
// see: https://cloud.google.com/bigquery/docs/user-defined-functions#custom-mask
DataGovernanceType string
}

// RemoteFunctionOptions contains information for a remote user-defined function.
Expand Down Expand Up @@ -278,6 +283,7 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
r.Language = rm.Language
r.RoutineType = rm.Type
r.DefinitionBody = rm.Body
r.DataGovernanceType = rm.DataGovernanceType
rt, err := rm.ReturnType.toBQ()
if err != nil {
return nil, err
Expand Down Expand Up @@ -405,15 +411,16 @@ func routineArgumentsToBQ(in []*RoutineArgument) ([]*bq.Argument, error) {

// RoutineMetadataToUpdate governs updating a routine.
type RoutineMetadataToUpdate struct {
Arguments []*RoutineArgument
Description optional.String
DeterminismLevel optional.String
Type optional.String
Language optional.String
Body optional.String
ImportedLibraries []string
ReturnType *StandardSQLDataType
ReturnTableType *StandardSQLTableType
Arguments []*RoutineArgument
Description optional.String
DeterminismLevel optional.String
Type optional.String
Language optional.String
Body optional.String
ImportedLibraries []string
ReturnType *StandardSQLDataType
ReturnTableType *StandardSQLTableType
DataGovernanceType optional.String
}

func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
Expand Down Expand Up @@ -491,20 +498,25 @@ func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
r.ReturnTableType = tt
forceSend("ReturnTableType")
}
if rm.DataGovernanceType != nil {
r.DataGovernanceType = optional.ToString(rm.DataGovernanceType)
forceSend("DataGovernanceType")
}
return r, nil
}

func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) {
meta := &RoutineMetadata{
ETag: r.Etag,
Type: r.RoutineType,
CreationTime: unixMillisToTime(r.CreationTime),
Description: r.Description,
DeterminismLevel: RoutineDeterminism(r.DeterminismLevel),
LastModifiedTime: unixMillisToTime(r.LastModifiedTime),
Language: r.Language,
ImportedLibraries: r.ImportedLibraries,
Body: r.DefinitionBody,
ETag: r.Etag,
Type: r.RoutineType,
CreationTime: unixMillisToTime(r.CreationTime),
Description: r.Description,
DeterminismLevel: RoutineDeterminism(r.DeterminismLevel),
LastModifiedTime: unixMillisToTime(r.LastModifiedTime),
Language: r.Language,
ImportedLibraries: r.ImportedLibraries,
Body: r.DefinitionBody,
DataGovernanceType: r.DataGovernanceType,
}
args, err := bqToArgs(r.Arguments)
if err != nil {
Expand Down
43 changes: 36 additions & 7 deletions bigquery/routine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"fmt"
"testing"

"cloud.google.com/go/bigquery/connection/apiv1/connectionpb"
"cloud.google.com/go/internal"
"cloud.google.com/go/internal/testutil"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/genproto/googleapis/cloud/bigquery/connection/v1"
)

func TestIntegration_RoutineScalarUDF(t *testing.T) {
Expand Down Expand Up @@ -53,6 +53,35 @@ func TestIntegration_RoutineScalarUDF(t *testing.T) {
}
}

func TestIntegration_RoutineDataGovernance(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

// Create a scalar UDF routine via API.
routineID := routineIDs.New()
routine := dataset.Routine(routineID)
err := routine.Create(ctx, &RoutineMetadata{
Type: "SCALAR_FUNCTION",
Language: "SQL",
Body: "x",
Arguments: []*RoutineArgument{
{
Name: "x",
DataType: &StandardSQLDataType{
TypeKind: "INT64",
},
},
},
ReturnType: &StandardSQLDataType{TypeKind: "INT64"},
DataGovernanceType: "DATA_MASKING",
})
if err != nil {
t.Fatalf("Create: %v", err)
}
}

func TestIntegration_RoutineJSUDF(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down Expand Up @@ -146,27 +175,27 @@ func TestIntegration_RoutineRemoteUDF(t *testing.T) {

func createConnection(ctx context.Context, t *testing.T, parent, name string) (cleanup func(), connectionID string, err error) {
fullname := fmt.Sprintf("%s/connections/%s", parent, name)
conn, err := connectionsClient.CreateConnection(ctx, &connection.CreateConnectionRequest{
conn, err := connectionsClient.CreateConnection(ctx, &connectionpb.CreateConnectionRequest{
Parent: parent,
ConnectionId: name,
Connection: &connection.Connection{
Connection: &connectionpb.Connection{
FriendlyName: name,
Properties: &connection.Connection_CloudResource{
CloudResource: &connection.CloudResourceProperties{},
Properties: &connectionpb.Connection_CloudResource{
CloudResource: &connectionpb.CloudResourceProperties{},
},
},
})
if err != nil {
return
}
conn, err = connectionsClient.GetConnection(ctx, &connection.GetConnectionRequest{
conn, err = connectionsClient.GetConnection(ctx, &connectionpb.GetConnectionRequest{
Name: fullname,
})
if err != nil {
return
}
cleanup = func() {
err := connectionsClient.DeleteConnection(ctx, &connection.DeleteConnectionRequest{
err := connectionsClient.DeleteConnection(ctx, &connectionpb.DeleteConnectionRequest{
Name: fullname,
})
if err != nil {
Expand Down
141 changes: 105 additions & 36 deletions bigquery/routine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func testRoutineConversion(t *testing.T, conversion string, in interface{}, want
t.Fatalf("failed input type conversion (bq.Routine): %v", in)
}
got, err = bqToRoutineMetadata(input)
case "FromRoutineMetadata":
input, ok := in.(*RoutineMetadata)
if !ok {
t.Fatalf("failed input type conversion (bq.RoutineMetadata): %v", in)
}
got, err = input.toBQ()
case "FromRoutineMetadataToUpdate":
input, ok := in.(*RoutineMetadataToUpdate)
if !ok {
Expand All @@ -53,9 +59,8 @@ func testRoutineConversion(t *testing.T, conversion string, in interface{}, want
default:
t.Fatalf("invalid comparison: %s", conversion)
}

if err != nil {
t.Fatalf("failed conversion function for %q", conversion)
t.Fatalf("failed conversion function for %q: %v", conversion, err)
}
if diff := testutil.Diff(got, want); diff != "" {
t.Fatalf("%+v: -got, +want:\n%s", in, diff)
Expand All @@ -72,9 +77,22 @@ func TestRoutineTypeConversions(t *testing.T) {
in interface{}
want interface{}
}{
{"empty", "ToRoutineMetadata", &bq.Routine{}, &RoutineMetadata{}},
{"basic", "ToRoutineMetadata",
&bq.Routine{
{
shollyman marked this conversation as resolved.
Show resolved Hide resolved
name: "empty",
conversion: "ToRoutineMetadata",
in: &bq.Routine{},
want: &RoutineMetadata{},
},
{
name: "empty",
conversion: "FromRoutineMetadata",
in: &RoutineMetadata{},
want: &bq.Routine{},
},
{
name: "basic",
conversion: "ToRoutineMetadata",
in: &bq.Routine{
CreationTime: aTimeMillis,
LastModifiedTime: aTimeMillis,
DefinitionBody: "body",
Expand All @@ -89,8 +107,9 @@ func TestRoutineTypeConversions(t *testing.T) {
{Name: "field", Type: &bq.StandardSqlDataType{TypeKind: "FLOAT64"}},
},
},
DataGovernanceType: "DATA_MASKING",
},
&RoutineMetadata{
want: &RoutineMetadata{
CreationTime: aTime,
LastModifiedTime: aTime,
Description: "desc",
Expand All @@ -105,55 +124,106 @@ func TestRoutineTypeConversions(t *testing.T) {
{Name: "field", Type: &StandardSQLDataType{TypeKind: "FLOAT64"}},
},
},
}},
{"body_and_libs", "FromRoutineMetadataToUpdate",
&RoutineMetadataToUpdate{
Body: "body",
ImportedLibraries: []string{"foo", "bar"},
ReturnType: &StandardSQLDataType{TypeKind: "FOO"},
DataGovernanceType: "DATA_MASKING",
},
&bq.Routine{
DefinitionBody: "body",
ImportedLibraries: []string{"foo", "bar"},
ReturnType: &bq.StandardSqlDataType{TypeKind: "FOO"},
ForceSendFields: []string{"DefinitionBody", "ImportedLibraries", "ReturnType"},
}},
{"null_fields", "FromRoutineMetadataToUpdate",
&RoutineMetadataToUpdate{
},
{
name: "basic",
conversion: "FromRoutineMetadata",
in: &RoutineMetadata{
Description: "desc",
DeterminismLevel: Deterministic,
Body: "body",
Type: "type",
Language: "lang",
ReturnType: &StandardSQLDataType{TypeKind: "INT64"},
ReturnTableType: &StandardSQLTableType{
Columns: []*StandardSQLField{
{Name: "field", Type: &StandardSQLDataType{TypeKind: "FLOAT64"}},
},
},
DataGovernanceType: "DATA_MASKING",
},
want: &bq.Routine{
DefinitionBody: "body",
Description: "desc",
DeterminismLevel: "DETERMINISTIC",
RoutineType: "type",
Language: "lang",
ReturnType: &bq.StandardSqlDataType{TypeKind: "INT64"},
ReturnTableType: &bq.StandardSqlTableType{
Columns: []*bq.StandardSqlField{
{Name: "field", Type: &bq.StandardSqlDataType{TypeKind: "FLOAT64"}},
},
},
DataGovernanceType: "DATA_MASKING",
},
},
{
name: "body_and_libs",
conversion: "FromRoutineMetadataToUpdate",
in: &RoutineMetadataToUpdate{
Body: "body",
ImportedLibraries: []string{"foo", "bar"},
ReturnType: &StandardSQLDataType{TypeKind: "FOO"},
DataGovernanceType: "DATA_MASKING",
},
want: &bq.Routine{
DefinitionBody: "body",
ImportedLibraries: []string{"foo", "bar"},
ReturnType: &bq.StandardSqlDataType{TypeKind: "FOO"},
DataGovernanceType: "DATA_MASKING",
ForceSendFields: []string{"DefinitionBody", "ImportedLibraries", "ReturnType", "DataGovernanceType"},
},
},
{
name: "null_fields",
conversion: "FromRoutineMetadataToUpdate",
in: &RoutineMetadataToUpdate{
Type: "type",
Arguments: []*RoutineArgument{},
ImportedLibraries: []string{},
},
&bq.Routine{
want: &bq.Routine{
RoutineType: "type",
ForceSendFields: []string{"RoutineType"},
NullFields: []string{"Arguments", "ImportedLibraries"},
}},
{"empty", "ToRoutineArgument",
&bq.Argument{},
&RoutineArgument{}},
{"basic", "ToRoutineArgument",
&bq.Argument{
},
},
{
name: "empty",
conversion: "ToRoutineArgument",
in: &bq.Argument{},
want: &RoutineArgument{}},
{
name: "basic",
conversion: "ToRoutineArgument",
in: &bq.Argument{
Name: "foo",
ArgumentKind: "bar",
Mode: "baz",
},
&RoutineArgument{
want: &RoutineArgument{
Name: "foo",
Kind: "bar",
Mode: "baz",
}},
{"empty", "FromRoutineArgument",
&RoutineArgument{},
&bq.Argument{},
},
},
{"basic", "FromRoutineArgument",
&RoutineArgument{
{
name: "empty",
conversion: "FromRoutineArgument",
in: &RoutineArgument{},
want: &bq.Argument{},
},
{
name: "basic",
conversion: "FromRoutineArgument",
in: &RoutineArgument{
Name: "foo",
Kind: "bar",
Mode: "baz",
},
&bq.Argument{
want: &bq.Argument{
Name: "foo",
ArgumentKind: "bar",
Mode: "baz",
Expand All @@ -162,7 +232,6 @@ func TestRoutineTypeConversions(t *testing.T) {

for _, test := range tests {
t.Run(fmt.Sprintf("%s/%s", test.conversion, test.name), func(t *testing.T) {
t.Parallel()
testRoutineConversion(t, test.conversion, test.in, test.want)
})
}
Expand Down
Loading