Skip to content

Commit

Permalink
fix: dataflowflextemplatejob specialize subnetwork format
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwenma committed Dec 12, 2024
1 parent 4db732a commit 6efdea6
Show file tree
Hide file tree
Showing 7 changed files with 1,030 additions and 3 deletions.
2 changes: 1 addition & 1 deletion apis/refs/v1beta1/computerefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func ResolveComputeSubnetwork(ctx context.Context, reader client.Reader, src cli
External: fmt.Sprintf("projects/%s/regions/%s/subnetworks/%s", projectID, region, subnetID),
}, nil
}
return nil, fmt.Errorf("format of computenetwork external=%q was not known (use projects/<projectId>/global/networks/<networkid>)", ref.External)
return nil, fmt.Errorf("format of computenetwork external=%q was not known (use projects/<projectId>/regions/<region>/subnetworks/<networkid>)", ref.External)
}

if ref.Name == "" {
Expand Down
44 changes: 42 additions & 2 deletions pkg/controller/direct/dataflow/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package dataflow

import (
"context"
"fmt"
"strings"

refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -59,7 +61,7 @@ func (r *refNormalizer) VisitField(path string, v any) error {
}

if subnetworkRef, ok := v.(*refs.ComputeSubnetworkRef); ok {
resolved, err := refs.ResolveComputeSubnetwork(r.ctx, r.kube, r.src, subnetworkRef)
resolved, err := RefineComputeSubnetworkRef(r.ctx, r.kube, r.src, subnetworkRef)
if err != nil {
return err
}
Expand All @@ -69,7 +71,7 @@ func (r *refNormalizer) VisitField(path string, v any) error {
if subnetworkRefs, ok := v.([]refs.ComputeSubnetworkRef); ok {
for i := range subnetworkRefs {
subnetworkRef := &subnetworkRefs[i]
resolved, err := refs.ResolveComputeSubnetwork(r.ctx, r.kube, r.src, subnetworkRef)
resolved, err := RefineComputeSubnetworkRef(r.ctx, r.kube, r.src, subnetworkRef)
if err != nil {
return err
}
Expand All @@ -79,3 +81,41 @@ func (r *refNormalizer) VisitField(path string, v any) error {

return nil
}

// RefineComputeSubnetworkRef refine the subnetwork format because DataflowFlexTemplateJob has a specific format requirement:
// "You can specify a subnetwork using either a complete URL or an abbreviated path.
//
// Expected to be of the form "https://www.googleapis.com/compute/v1/projects/HOST_PROJECT_ID/regions/REGION/subnetworks/SUBNETWORK"
// or "regions/REGION/subnetworks/SUBNETWORK". If the subnetwork is located in a Shared VPC network,
// you must use the complete URL.
func RefineComputeSubnetworkRef(ctx context.Context, reader client.Reader, src client.Object, ref *refs.ComputeSubnetworkRef) (*refs.ComputeSubnetworkRef, error) {
if ref == nil {
return nil, nil
}
// Use common ComputeSubnetwork resolver
if ref.External == "" {
var err error
ref, err = refs.ResolveComputeSubnetwork(ctx, reader, src, ref)
if err != nil {
return nil, err
}
}

// Validate non-shared-VPC network format. This is not allowed in the common ComputeSubnetwork resolver
tokens := strings.Split(ref.External, "/")
if len(tokens) == 4 && tokens[0] == "regions" && tokens[2] == "subnetworks" {
return &refs.ComputeSubnetworkRef{
External: ref.External,
}, nil
}

// Validate and refine the shared-VPC network format to full URL.
fullURLPrefix := "https://www.googleapis.com/compute/v1/"
ref.External = strings.TrimPrefix(ref.External, fullURLPrefix)
if len(tokens) == 6 && tokens[0] == "projects" && tokens[2] == "regions" && tokens[4] == "subnetworks" {
return &refs.ComputeSubnetworkRef{
External: fullURLPrefix + "projects/" + tokens[1] + "/regions/" + tokens[3] + "/subnetworks/" + tokens[5],
}, nil
}
return nil, fmt.Errorf("format of subnetwork external=%q was not known, use regions/<region>/subnetworks/<subnetwork>, projects/<projectID>/regions/<region>/subnetworks/<subnetwork> or https://www.googleapis.com/compute/v1/projects/<projectID>/regions/<region>/subnetworks/<subnetwork>", ref.External)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: dataflow.cnrm.cloud.google.com/v1beta1
kind: DataflowFlexTemplateJob
metadata:
annotations:
alpha.cnrm.cloud.google.com/reconciler: direct
cnrm.cloud.google.com/management-conflict-prevention-policy: none
cnrm.cloud.google.com/on-delete: cancel
cnrm.cloud.google.com/project-id: ${projectId}
finalizers:
- cnrm.cloud.google.com/finalizer
- cnrm.cloud.google.com/deletion-defender
generation: 1
labels:
cnrm-test: "true"
name: dataflowflextemplatejob-${uniqueId}
namespace: ${uniqueId}
spec:
containerSpecGcsPath: gs://dataflow-templates/2022-10-03-00_RC00/flex/File_Format_Conversion
parameters:
inputFileFormat: csv
inputFileSpec: gs://config-connector-samples/dataflowflextemplate/numbertest.csv
outputBucket: gs://storagebucket-${uniqueId}
outputFileFormat: avro
schema: gs://config-connector-samples/dataflowflextemplate/numbers.avsc
region: us-central1
status:
conditions:
- lastTransitionTime: "1970-01-01T00:00:00Z"
message: The resource is up to date
reason: UpToDate
status: "True"
type: Ready
jobId: ${jobID}
observedGeneration: 1
state: JOB_STATE_RUNNING
type: JOB_TYPE_BATCH
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: dataflow.cnrm.cloud.google.com/v1beta1
kind: DataflowFlexTemplateJob
metadata:
annotations:
alpha.cnrm.cloud.google.com/reconciler: direct
cnrm.cloud.google.com/management-conflict-prevention-policy: none
cnrm.cloud.google.com/on-delete: cancel
cnrm.cloud.google.com/project-id: ${projectId}
finalizers:
- cnrm.cloud.google.com/finalizer
- cnrm.cloud.google.com/deletion-defender
generation: 1
labels:
cnrm-test: "true"
name: dataflowflextemplatejob-${uniqueId}
namespace: ${uniqueId}
spec:
containerSpecGcsPath: gs://dataflow-templates/2020-08-31-00_RC00/flex/PubSub_Avro_to_BigQuery
parameters:
createDisposition: CREATE_NEVER
inputSubscription: projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId}
outputTableSpec: ${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId}
outputTopic: projects/${projectId}/topics/pubsubtopic0-${uniqueId}
schemaPath: gs://config-connector-samples/dataflowflextemplate/numbers.avsc
region: us-central1
subnetworkRef:
external: projects/${projectId}/regions/us-central1/subnetworks/default
status:
conditions:
- lastTransitionTime: "1970-01-01T00:00:00Z"
message: The resource is up to date
reason: UpToDate
status: "True"
type: Ready
jobId: ${jobID}
observedGeneration: 1
state: JOB_STATE_RUNNING
type: JOB_TYPE_STREAMING
Loading

0 comments on commit 6efdea6

Please sign in to comment.