Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add direct controller for BigQueryDataTransferConfig #2688

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var BigQueryDataTransferConfigGVK = GroupVersion.WithKind("BigQueryDataTransferC
// +kcc:proto=google.cloud.bigquery.datatransfer.v1.EncryptionConfiguration
type EncryptionConfiguration struct {
// The KMS key used for encrypting BigQuery data.
KmsKeyRef refv1beta1.KMSCryptoKeyRef `json:"kmsKeyRef,omitempty"`
KmsKeyRef *refv1beta1.KMSCryptoKeyRef `json:"kmsKeyRef,omitempty"`
}

// BigQueryDataTransferConfigSpec defines the desired state of BigQueryDataTransferConfig
Expand All @@ -44,10 +44,12 @@ type BigQueryDataTransferConfigSpec struct {
// Data source ID. This cannot be changed once data transfer is created. The
// full list of available data source IDs can be returned through an API call:
// https://cloud.google.com/bigquery-transfer/docs/reference/datatransfer/rest/v1/projects.locations.dataSources/list
// +required
DataSourceID *string `json:"dataSourceID,omitempty"`

// The BigQuery target dataset id.
DatasetRef refv1beta1.BigQueryDatasetRef `json:"datasetRef,omitempty"`
// +required
DatasetRef *refv1beta1.BigQueryDatasetRef `json:"datasetRef,omitempty"`

// Is this config disabled. When set to true, no runs will be scheduled for
// this transfer config.
Expand All @@ -69,12 +71,13 @@ type BigQueryDataTransferConfigSpec struct {

// Pub/Sub topic where notifications will be sent after transfer runs
// associated with this transfer config finish.
PubSubTopicRef refv1beta1.PubSubTopicRef `json:"pubSubTopicRef,omitempty"`
PubSubTopicRef *refv1beta1.PubSubTopicRef `json:"pubSubTopicRef,omitempty"`

// Parameters specific to each data source. For more information see the
// bq tab in the 'Setting up a data transfer' section for each data source.
// For example the parameters for Cloud Storage transfers are listed here:
// https://cloud.google.com/bigquery-transfer/docs/cloud-storage-transfer#bq
// +required
Params map[string]string `json:"params,omitempty"`

Parent `json:",inline"`
Expand Down Expand Up @@ -103,8 +106,11 @@ type BigQueryDataTransferConfigSpec struct {
// Options customizing the data transfer schedule.
ScheduleOptions *ScheduleOptions `json:"scheduleOptions,omitempty"`

// Deprecated. Unique ID of the user on whose behalf transfer is done.
UserID *int64 `json:"userID,omitempty"`
// Service account email. If this field is set, the transfer config will be created with this service account's credentials.
// It requires that the requesting user calling this API has permissions to act as this service account.
// Note that not all data sources support service account credentials when creating a transfer config.
// For the latest list of data sources, please refer to https://cloud.google.com/bigquery/docs/use-service-accounts.
ServiceAccountRef *refv1beta1.IAMServiceAccountRef `json:"serviceAccountRef,omitempty"`
}

type Parent struct {
Expand Down Expand Up @@ -161,6 +167,9 @@ type BigQueryDataTransferConfigObservedState struct {

// Output only. Data transfer modification time. Ignored by server on input.
UpdateTime *string `json:"updateTime,omitempty"`

// Deprecated. Unique ID of the user on whose behalf transfer is done.
UserID *int64 `json:"userID,omitempty"`
}

// +genclient
Expand Down
29 changes: 23 additions & 6 deletions apis/bigquerydatatransfer/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 88 additions & 3 deletions apis/refs/v1beta1/bigqueryref.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,96 @@

package v1beta1

import (
"context"
"fmt"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type BigQueryDatasetRef struct {
// If provided must be in the format `bigquery.googleapis.com/projects/[project_id]/datasets/[dataset_id]`.
// If provided must be in the format `projects/[project_id]/datasets/[dataset_id]`.
External string `json:"external,omitempty"`
// The `metadata.name` field of a `PubSubTopic` resource.
// The `metadata.name` field of a `BigQueryDataset` resource.
Name string `json:"name,omitempty"`
// The `metadata.namespace` field of a `PubSubTopic` resource.
// The `metadata.namespace` field of a `BigQueryDataset` resource.
Namespace string `json:"namespace,omitempty"`
}

type BigQueryDataset struct {
projectID string
datasetID string
}

func ResolveBigQueryDataset(ctx context.Context, reader client.Reader, src client.Object, ref *BigQueryDatasetRef) (*BigQueryDataset, error) {
if ref == nil {
return nil, nil
}

if ref.Name == "" && ref.External == "" {
return nil, fmt.Errorf("must specify either name or external on BigQueryDatasetRef")
}
if ref.Name != "" && ref.External != "" {
return nil, fmt.Errorf("cannot specify both name and external on BigQueryDatasetRef")
}

// External is provided.
if ref.External != "" {
// External should be in the `projects/[project_id]/datasets/[dataset_id]` format.
tokens := strings.Split(ref.External, "/")
if len(tokens) == 4 && tokens[0] == "projects" && tokens[2] == "datasets" {
return &BigQueryDataset{
projectID: tokens[1],
datasetID: tokens[3],
}, nil
}
return nil, fmt.Errorf("format of BigQueryDatasetRef external=%q was not known (use projects/[project_id]/datasets/[dataset_id])", ref.External)

}

// Fetch BigQueryDataset object to construct the external form.
dataset := &unstructured.Unstructured{}
dataset.SetGroupVersionKind(schema.GroupVersionKind{
Group: "bigquery.cnrm.cloud.google.com",
Version: "v1beta1",
Kind: "BigQueryDataset",
})
nn := types.NamespacedName{
Namespace: ref.Namespace,
Name: ref.Name,
}
if nn.Namespace == "" {
nn.Namespace = src.GetNamespace()
}
if err := reader.Get(ctx, nn, dataset); err != nil {
if apierrors.IsNotFound(err) {
return nil, fmt.Errorf("referenced BigQueryDataset %v not found", nn)
}
return nil, fmt.Errorf("error reading referenced BigQueryDataset %v: %w", nn, err)
}
projectID, err := ResolveProjectID(ctx, reader, dataset)
if err != nil {
return nil, err
}
datasetID, err := GetResourceID(dataset)
if err != nil {
return nil, err
}
return &BigQueryDataset{
projectID: projectID,
datasetID: datasetID,
}, nil
}

func (d *BigQueryDataset) String() string {
return fmt.Sprintf("projects/%s/datasets/%s", d.projectID, d.datasetID)
}

func (d *BigQueryDataset) GetDatasetID() string {
return d.datasetID
}
80 changes: 80 additions & 0 deletions apis/refs/v1beta1/pubsubref.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@

package v1beta1

import (
"context"
"fmt"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PubSubTopicRef struct {
// If provided must be in the format `projects/[project_id]/topics/[topic_id]`.
External string `json:"external,omitempty"`
Expand All @@ -22,3 +34,71 @@ type PubSubTopicRef struct {
// The `metadata.namespace` field of a `PubSubTopic` resource.
Namespace string `json:"namespace,omitempty"`
}

type PubSubTopic struct {
projectID string
topicID string
}

func ResolvePubSubTopic(ctx context.Context, reader client.Reader, src client.Object, ref *PubSubTopicRef) (*PubSubTopic, error) {
if ref == nil {
return nil, nil
}

if ref.Name == "" && ref.External == "" {
return nil, fmt.Errorf("must specify either name or external on PubSubTopicRef")
}
if ref.Name != "" && ref.External != "" {
return nil, fmt.Errorf("cannot specify both name and external on PubSubTopicRef")
}

// External is provided.
if ref.External != "" {
// External should be in the `projects/[project_id]/topics/[topic_id]` format.
tokens := strings.Split(ref.External, "/")
if len(tokens) == 4 && tokens[0] == "projects" && tokens[2] == "topics" {
return &PubSubTopic{
projectID: tokens[1],
topicID: tokens[3],
}, nil
}
return nil, fmt.Errorf("format of PubSubTopicRef external=%q was not known (use projects/[project_id]/topics/[topic_id])", ref.External)
}

// Fetch PubSubTopic object to construct the external form.
pubSubTopic := &unstructured.Unstructured{}
pubSubTopic.SetGroupVersionKind(schema.GroupVersionKind{
Group: "pubsub.cnrm.cloud.google.com",
Version: "v1beta1",
Kind: "PubSubTopic",
})
nn := types.NamespacedName{
Namespace: ref.Namespace,
Name: ref.Name,
}
if nn.Namespace == "" {
nn.Namespace = src.GetNamespace()
}
if err := reader.Get(ctx, nn, pubSubTopic); err != nil {
if apierrors.IsNotFound(err) {
return nil, fmt.Errorf("referenced PubSubTopic %v not found", nn)
}
return nil, fmt.Errorf("error reading referenced PubSubTopic %v: %w", nn, err)
}
projectID, err := ResolveProjectID(ctx, reader, pubSubTopic)
if err != nil {
return nil, err
}
topicID, err := GetResourceID(pubSubTopic)
if err != nil {
return nil, err
}
return &PubSubTopic{
projectID: projectID,
topicID: topicID,
}, nil
}

func (t *PubSubTopic) String() string {
return fmt.Sprintf("projects/%s/topics/%s", t.projectID, t.topicID)
}
Loading
Loading