From 8d1440b40684d1be02035ad9c3d34f4482369239 Mon Sep 17 00:00:00 2001 From: justinsb Date: Wed, 28 Aug 2024 14:11:50 -0400 Subject: [PATCH] feat: better updates for DataflowFlexTemplateJob in direct controller --- .../dataflowflextemplatejob_controller.go | 242 ++++++-- .../directbase/directbase_controller.go | 20 +- .../direct/directbase/operations.go | 162 ++++- ...dataflowflextemplatejob-direct.golden.yaml | 6 +- .../_http.log | 563 ------------------ ...reamingdataflowflextemplatejob.golden.yaml | 4 +- .../_http.log | 2 +- .../create.yaml | 2 +- 8 files changed, 367 insertions(+), 634 deletions(-) diff --git a/pkg/controller/direct/dataflow/dataflowflextemplatejob_controller.go b/pkg/controller/direct/dataflow/dataflowflextemplatejob_controller.go index 8fb535c2e4f..e32d08e9f14 100644 --- a/pkg/controller/direct/dataflow/dataflowflextemplatejob_controller.go +++ b/pkg/controller/direct/dataflow/dataflowflextemplatejob_controller.go @@ -24,12 +24,15 @@ import ( pb "cloud.google.com/go/dataflow/apiv1beta3/dataflowpb" krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/dataflow/v1beta1" refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/monitoring" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s" "google.golang.org/protobuf/proto" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" @@ -330,30 +333,75 @@ func (a *dataflowFlexTemplateJobAdapter) Create(ctx context.Context, createOp *d } job := response.GetJob() - jobID := job.GetId() + + if err := a.updateStatus(ctx, createOp, job); err != nil { + return err + } + + return nil + + // ready := false + // for !ready { + // time.Sleep(2 * time.Second) + // latest, err := a.getJob(ctx, jobID) + // if err != nil { + // // TODO: not right! + // return fmt.Errorf("getting state of job") + // } + // switch latest.CurrentState { + // case pb.JobState_JOB_STATE_RUNNING: + // ready = true + // default: + // log.Info("unexpected job state waiting for job running", "state", latest.CurrentState) + // } + // } + + // return setStatus(u, status) +} + +func (a *dataflowFlexTemplateJobAdapter) updateStatus(ctx context.Context, op directbase.Operation, job *pb.Job) error { // TODO: Use jobCreateTime for mutation checking? // jobCreateTime := job.CreateTime() - ready := false - for !ready { - time.Sleep(2 * time.Second) - latest, err := a.getJob(ctx, jobID) - if err != nil { - // TODO: not right! - return fmt.Errorf("getting state of job") + status := &krm.DataflowFlexTemplateJobStatus{ + JobID: job.GetId(), + CurrentState: direct.PtrTo(job.CurrentState.String()), + Type: direct.PtrTo(job.Type.String()), + } + + var readyCondition *v1alpha1.Condition + + switch job.CurrentState { + case pb.JobState_JOB_STATE_RUNNING: + readyCondition = &v1alpha1.Condition{ + Type: v1alpha1.ReadyConditionType, + Status: v1.ConditionTrue, + Reason: k8s.UpToDate, + Message: "The resource is up to date", } - switch latest.CurrentState { - case pb.JobState_JOB_STATE_RUNNING: - ready = true - default: - log.Info("unexpected job state waiting for job running", "state", latest.CurrentState) + + case pb.JobState_JOB_STATE_FAILED: + readyCondition = &v1alpha1.Condition{ + Type: v1alpha1.ReadyConditionType, + Status: v1.ConditionFalse, + Reason: k8s.UpToDate, + Message: "Dataflow job is in FAILED state", + } + + default: + readyCondition = &v1alpha1.Condition{ + Type: v1alpha1.ReadyConditionType, + Status: v1.ConditionFalse, + Reason: k8s.Updating, + Message: fmt.Sprintf("Waiting for Dataflow job to be running (state is %v)", job.CurrentState), } } - status := &krm.DataflowFlexTemplateJobStatus{ - JobID: jobID, + if err := op.UpdateStatus(ctx, status, readyCondition); err != nil { + return fmt.Errorf("updating status: %w", err) } - return setStatus(u, status) + + return nil } // Update implements the Adapter interface. @@ -363,7 +411,31 @@ func (a *dataflowFlexTemplateJobAdapter) Update(ctx context.Context, updateOp *d log := klog.FromContext(ctx) log.V(0).Info("updating object", "u", u) - if true { + observedGeneration, _, err := unstructured.NestedInt64(u.Object, "status", "observedGeneration") + if err != nil { + return fmt.Errorf("reading status.observedGeneration: %w", err) + } + metadataGeneration := u.GetGeneration() + + // We rely on the FlexJob being immutable, so drift at the GCP level should not be possible. + // Instead, we reconcile whenever we see a different spec.generation + if observedGeneration == metadataGeneration { + log.V(0).Info("object status.observedGeneration matches metadata.generation, skipping reconcile", "status.observedGeneration", observedGeneration, "metadata.generation", metadataGeneration) + + j, _ := u.MarshalJSON() + log.V(0).Info("object is", "json", string(j)) + + // If we are waiting on the existing job, update the status + if a.actual != nil { + if err := a.updateStatus(ctx, updateOp, a.actual); err != nil { + return err + } + + // TODO: If job fails to update, we probably need to "put back" the old job id + // But we also want to avoid repeatedly launching a failing job... + // The current behaviour will only try to launch once, which seems reasonable. + } + return nil } @@ -383,35 +455,123 @@ func (a *dataflowFlexTemplateJobAdapter) Update(ctx context.Context, updateOp *d } job := response.GetJob() - jobID := job.GetId() - - // TODO: Use jobCreateTime for mutation checking? - // jobCreateTime := job.CreateTime() - status := &krm.DataflowFlexTemplateJobStatus{ - JobID: jobID, + if err := a.updateStatus(ctx, updateOp, job); err != nil { + return err } - return setStatus(u, status) + + return nil } func (a *dataflowFlexTemplateJobAdapter) fullyQualifiedName() string { return fmt.Sprintf("projects/%s/locations/%s/clusters/%s", a.projectID, a.location, a.resourceID) } -func setStatus(u *unstructured.Unstructured, typedStatus any) error { - status, err := runtime.DefaultUnstructuredConverter.ToUnstructured(typedStatus) - if err != nil { - return fmt.Errorf("error converting status to unstructured: %w", err) - } - - old, _, _ := unstructured.NestedMap(u.Object, "status") - if old != nil { - status["conditions"] = old["conditions"] - status["observedGeneration"] = old["observedGeneration"] - status["externalRef"] = old["externalRef"] - } - - u.Object["status"] = status - - return nil -} +// func setStatus(u *unstructured.Unstructured, typedStatus any) error { +// status, err := runtime.DefaultUnstructuredConverter.ToUnstructured(typedStatus) +// if err != nil { +// return fmt.Errorf("error converting status to unstructured: %w", err) +// } + +// old, _, _ := unstructured.NestedMap(u.Object, "status") +// if old != nil { +// status["conditions"] = old["conditions"] +// status["observedGeneration"] = old["observedGeneration"] +// status["externalRef"] = old["externalRef"] +// } + +// u.Object["status"] = status + +// return nil +// } + +// // ShouldReconcileBasedOnEtag checks if we should reconcile based on the GCP etag matching the KRM etag. +// // If the etag in KRM status is the same as the GCP etag, we consider the GCP object not to have changed. +// // We also consider the object to have changes if the KRM object generation != observedGeneration (spec changes), +// // and we also reconcile again if the object is not healthy (based on status.conditions). +// // +// // A few problems with the approach: +// // * We miss changes due to labels or annotations. +// // * If there's a change in the GCP object that isn't reflected in etag, we miss that (seems unlikely) +// // * Because we set spec.resourceID, we do an extra reconciliation after first creation (because we bump generation). +// func getObservedGeneration(u *unstructured.Unstructured) bool { +// obj := &objectWithEtag{} +// observedGeneration, _, _ := unstructured.NestedInt64(u.Object, "status", "observedGeneration") +// if observedGeneration != 0 { +// obj.Status.ObservedGeneration = &observedGeneration +// } + +// ) +// if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil { +// return nil, fmt.Errorf("") +// } + +// if u.GetGeneration() != direct.ValueOf(obj.Status.ObservedGeneration) { +// log.V(2).Info("generation does not match", "generation", u.GetGeneration(), "observedGeneration", direct.ValueOf(obj.Status.ObservedGeneration)) +// return true +// } + +// // if gcpEtag == "" { +// // log.V(2).Info("etag not set in GCP") +// // return true +// // } + +// // objectEtag := direct.ValueOf(obj.Status.ObservedState.Etag) +// // if objectEtag == "" { +// // objectEtag = direct.ValueOf(obj.Status.Etag) +// // } + +// // if objectEtag == "" { +// // log.V(2).Info("etag not set in KRM object") +// // return true +// // } + +// // if gcpEtag != objectEtag { +// // log.V(2).Info("object status etag does not match gcp updateTime", "objectEtag", objectEtag, "gcpEtag", gcpEtag) +// // return true +// // } + +// if obj.Status.Conditions != nil { +// // if there was a previously failing update let's make sure we give +// // the update a chance to heal or keep marking it as failed + +// ready := false +// for _, condition := range obj.Status.Conditions { +// if condition.Type == v1alpha1.ReadyConditionType { +// if condition.Status == corev1.ConditionTrue { +// ready = true +// } +// } +// } + +// if !ready { +// log.V(2).Info("status.conditions indicates object is not ready yet") +// return true +// } +// } + +// log.V(2).Info("object etag matches gcp etag", "objectEtag", objectEtag, "gcpEtag", gcpEtag) +// return false +// } + +// // objectWithEtag holds the fields that are relevant to an etag-based change detection. +// type objectWithEtag struct { +// Status objectWithEtagtatus `json:"status"` +// } + +// type objectWithEtagtatus struct { +// Conditions []v1alpha1.Condition `json:"conditions,omitempty"` + +// // Used if status.observedState.etag is not set +// Etag *string `json:"etag,omitempty"` + +// // Compared to the object's generation to detect spec changes +// ObservedGeneration *int64 `json:"observedGeneration,omitempty"` + +// ObservedState objectWithEtagObservedState `json:"observedState,omitempty"` +// } + +// type objectWithEtagObservedState struct { +// // Checked before status.etag +// Etag *string `json:"etag,omitempty"` +// } diff --git a/pkg/controller/direct/directbase/directbase_controller.go b/pkg/controller/direct/directbase/directbase_controller.go index 9768454b7a1..1d8d2b32e65 100644 --- a/pkg/controller/direct/directbase/directbase_controller.go +++ b/pkg/controller/direct/directbase/directbase_controller.go @@ -265,7 +265,7 @@ func (r *reconcileContext) doReconcile(ctx context.Context, u *unstructured.Unst return true, nil } if !k8s.HasAbandonAnnotation(u) { - deleteOp := NewDeleteOperation(u) + deleteOp := NewDeleteOperation(r.Reconciler.Client, u) if _, err := adapter.Delete(ctx, deleteOp); err != nil { if !errors.Is(err, k8s.ErrIAMNotFound) && !k8s.IsReferenceNotFoundError(err) { if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok { @@ -291,8 +291,11 @@ func (r *reconcileContext) doReconcile(ctx context.Context, u *unstructured.Unst // set the etag to an empty string, since IAMPolicy is the authoritative intent, KCC wants to overwrite the underlying policy regardless //policy.Spec.Etag = "" + hasSetReadyCondition := false + shouldRequeue := false + if !existsAlready { - createOp := NewCreateOperation(u) + createOp := NewCreateOperation(r.Reconciler.Client, u) if err := adapter.Create(ctx, createOp); err != nil { if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok { logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(u)) @@ -300,8 +303,10 @@ func (r *reconcileContext) doReconcile(ctx context.Context, u *unstructured.Unst } return false, r.handleUpdateFailed(ctx, u, fmt.Errorf("error creating: %w", err)) } + hasSetReadyCondition = createOp.HasSetReadyCondition + shouldRequeue = createOp.ShouldRequeue } else { - updateOp := NewUpdateOperation(u) + updateOp := NewUpdateOperation(r.Reconciler.Client, u) if err := adapter.Update(ctx, updateOp); err != nil { if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok { logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(u)) @@ -309,11 +314,14 @@ func (r *reconcileContext) doReconcile(ctx context.Context, u *unstructured.Unst } return false, r.handleUpdateFailed(ctx, u, fmt.Errorf("error updating: %w", err)) } + hasSetReadyCondition = updateOp.HasSetReadyCondition + shouldRequeue = updateOp.ShouldRequeue } - if isAPIServerUpdateRequired(u) { - return false, r.handleUpToDate(ctx, u) + + if !hasSetReadyCondition && isAPIServerUpdateRequired(u) { + return shouldRequeue, r.handleUpToDate(ctx, u) } - return false, nil + return shouldRequeue, nil } // ensureFinalizers will apply our finalizers to the object if they are not present. diff --git a/pkg/controller/direct/directbase/operations.go b/pkg/controller/direct/directbase/operations.go index 89acf18bf3e..927a7faaf8a 100644 --- a/pkg/controller/direct/directbase/operations.go +++ b/pkg/controller/direct/directbase/operations.go @@ -14,41 +14,69 @@ package directbase -import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +import ( + "context" + "fmt" + "time" + + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// operationBase defines common functionality for multiple operation types. +type operationBase struct { + client client.Client -type UpdateOperation struct { object *unstructured.Unstructured + + // HasSetReadyCondition tracks whether the controller explcitly set the ready condition + HasSetReadyCondition bool + + // ShouldRequeue tracks whether we need a re-reconciliation + ShouldRequeue bool } -func NewUpdateOperation(object *unstructured.Unstructured) *UpdateOperation { - return &UpdateOperation{ - object: object, - } +// Operation defines some functionality supported by all operation types. +type Operation interface { + UpdateStatus(ctx context.Context, typedStatus any, readyCondition *v1alpha1.Condition) error } -func (o *UpdateOperation) GetUnstructured() *unstructured.Unstructured { +// GetUnstructured returns the object being reconciled, in unstructured format. +func (o *operationBase) GetUnstructured() *unstructured.Unstructured { return o.object } -type CreateOperation struct { - object *unstructured.Unstructured +type UpdateOperation struct { + operationBase } -func NewCreateOperation(object *unstructured.Unstructured) *CreateOperation { - return &CreateOperation{ - object: object, - } +func NewUpdateOperation(client client.Client, object *unstructured.Unstructured) *UpdateOperation { + op := &UpdateOperation{} + op.client = client + op.object = object + return op } -func (o *CreateOperation) GetUnstructured() *unstructured.Unstructured { - return o.object +type CreateOperation struct { + operationBase +} + +func NewCreateOperation(client client.Client, object *unstructured.Unstructured) *CreateOperation { + op := &CreateOperation{} + op.client = client + op.object = object + return op } type DeleteOperation struct { object *unstructured.Unstructured } -func NewDeleteOperation(object *unstructured.Unstructured) *DeleteOperation { +func NewDeleteOperation(client client.Client, object *unstructured.Unstructured) *DeleteOperation { return &DeleteOperation{ object: object, } @@ -57,3 +85,105 @@ func NewDeleteOperation(object *unstructured.Unstructured) *DeleteOperation { func (o *DeleteOperation) GetUnstructured() *unstructured.Unstructured { return o.object } + +func (o *operationBase) UpdateStatus(ctx context.Context, typedStatus any, readyCondition *v1alpha1.Condition) error { + status, err := runtime.DefaultUnstructuredConverter.ToUnstructured(typedStatus) + if err != nil { + return fmt.Errorf("error converting status to unstructured: %w", err) + } + + old, _, _ := unstructured.NestedMap(o.object.Object, "status") + if old != nil { + if status["conditions"] == nil { + status["conditions"] = old["conditions"] + } + // status["observedGeneration"] = old["observedGeneration"] + if status["externalRef"] == nil { + status["externalRef"] = old["externalRef"] + } + } + + status["observedGeneration"] = o.object.GetGeneration() + + if readyCondition != nil { + o.HasSetReadyCondition = true + if readyCondition.Status != v1.ConditionTrue { + o.ShouldRequeue = true + } + var statusWithConditions statusWithConditions + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(status, &statusWithConditions); err != nil { + return fmt.Errorf("error converting status.conditions from structured: %w", err) + } + + // Must be non-nil (for unclear reasons!) + if statusWithConditions.Conditions == nil { + statusWithConditions.Conditions = []v1alpha1.Condition{} + } + SetStatusCondition(&statusWithConditions.Conditions, *readyCondition) + + unstructuredStatusWithConditions, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&statusWithConditions) + if err != nil { + return fmt.Errorf("error converting status.conditions to unstructured: %w", err) + } + + status["conditions"] = unstructuredStatusWithConditions["conditions"] + } + + u := o.object + u.Object["status"] = status + + if err := o.client.Status().Update(ctx, u); err != nil { + return fmt.Errorf("updating object status: %w", err) + } + + return nil +} + +type statusWithConditions struct { + Conditions []v1alpha1.Condition `json:"conditions,omitempty"` +} + +// SetStatusCondition is lifted from metav1.SetStatusCondition, but adapted to v1alpha1.Condition + +// SetStatusCondition sets the corresponding condition in conditions to newCondition. +// conditions must be non-nil. +// 1. if the condition of the specified type already exists (all fields of the existing condition are updated to +// newCondition, LastTransitionTime is set to now if the new status differs from the old status) +// 2. if a condition of the specified type does not exist (LastTransitionTime is set to now() if unset, and newCondition is appended) +func SetStatusCondition(conditions *[]v1alpha1.Condition, newCondition v1alpha1.Condition) { + if conditions == nil { + return + } + existingCondition := FindStatusCondition(*conditions, newCondition.Type) + if existingCondition == nil { + if newCondition.LastTransitionTime == "" { + newCondition.LastTransitionTime = metav1.Now().Format(time.RFC3339) + } + *conditions = append(*conditions, newCondition) + return + } + + if existingCondition.Status != newCondition.Status { + existingCondition.Status = newCondition.Status + if newCondition.LastTransitionTime != "" { + existingCondition.LastTransitionTime = newCondition.LastTransitionTime + } else { + existingCondition.LastTransitionTime = metav1.Now().Format(time.RFC3339) + } + } + + existingCondition.Reason = newCondition.Reason + existingCondition.Message = newCondition.Message + // existingCondition.ObservedGeneration = newCondition.ObservedGeneration +} + +// FindStatusCondition finds the conditionType in conditions. +func FindStatusCondition(conditions []v1alpha1.Condition, conditionType string) *v1alpha1.Condition { + for i := range conditions { + if conditions[i].Type == conditionType { + return &conditions[i] + } + } + + return nil +} diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml index 4a0768448c1..c6389cbce94 100644 --- a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml @@ -6,9 +6,6 @@ metadata: cnrm.cloud.google.com/management-conflict-prevention-policy: none cnrm.cloud.google.com/on-delete: cancel cnrm.cloud.google.com/project-id: ${projectId} - finalizers: - - cnrm.cloud.google.com/finalizer - - cnrm.cloud.google.com/deletion-defender generation: 1 labels: cnrm-test: "true" @@ -26,9 +23,10 @@ spec: status: conditions: - lastTransitionTime: "1970-01-01T00:00:00Z" - message: The resource is up to date reason: UpToDate status: "True" type: Ready jobId: ${jobID} observedGeneration: 1 + state: JOB_STATE_RUNNING + type: JOB_TYPE_BATCH diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log index 337de31165b..a13ae89a61b 100644 --- a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log @@ -239,184 +239,6 @@ X-Content-Type-Options: nosniff X-Frame-Options: SAMEORIGIN X-Xss-Protection: 0 -{ - "createTime": "2024-04-01T12:34:56.123456Z", - "currentState": 9, - "currentStateTime": "2024-04-01T12:34:56.123456Z", - "environment": { - "dataset": "bigquery.googleapis.com/cloud_dataflow", - "experiments": [ - "auto_google_template_runner_v2", - "auto_high_core_runner_v2", - "auto_runner_v2_min_sdk=2.54.0", - "configure_shuffle_service_addresses_in_control_plane", - "delayed_launch", - "disable_baggins_exp", - "disable_primeflex", - "disable_runner_v2_reason=java_job_google_template", - "ek_regions=", - "enable_always_on_exception_sampling", - "enable_async_job_creation", - "enable_billing_v_1_5", - "enable_cloud_permissions_checking", - "enable_cmek_org_policy_check", - "enable_compute_default_service_account_org_policy", - "enable_data_sampling_telemetry", - "enable_dataprep_new_billing", - "enable_fnapi_multimap_side_input_bulk_read", - "enable_memory_sampler", - "enable_oom_sampler", - "enable_recommendations", - "enable_remote_image_ping", - "enable_secure_boot", - "enable_throttled_based_rescaling", - "enable_worker_cloud_monitoring_exporter", - "enable_worker_disk_cloud_monitoring", - "enable_worker_memory_cloud_monitoring", - "enable_zonal_outage_aware_routing", - "limit_preemptible_worker_pct", - "limit_resizing_by_cpu_util", - "min_sdk_version_to_reject_worker_in_different_region_than_service=2.44.0", - "override_controller_service_account", - "primeflex_slow_start_pct=5", - "primeflex_slow_start_seconds=3600", - "regional_physical_zone_separation_enabled", - "shuffle_mode=auto", - "shuffle_service_address_type=DIRECTPATH_WITH_CFE_FALLBACK", - "sideinput_io_metrics", - "use_dataflow_service_account_in_igm", - "use_e2_for_default_machine_type_worker_regions=africa-south1,europe-north2,europe-southwest1,europe-west10,europe-west12,europe-west8,europe-west9,me-central1,me-central2,me-west1,northamerica-south1,southamerica-west1,us-east10,us-east5,us-east7,us-south1,us-west8", - "use_job_admission_controller", - "use_multi_hop_delegation", - "use_templates_regional_bucket", - "use_worker_zone_chooser_by_default" - ], - "sdkPipelineOptions": { - "display_data": [], - "options": { - "apiRootUrl": "https://dataflow.googleapis.com/", - "appName": "FileFormatConversion", - "autoscalingAlgorithm": null, - "containsHeaders": false, - "credentialFactoryClass": "org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory", - "csvFileEncoding": "UTF-8", - "csvFormat": "Default", - "dataflowEndpoint": "", - "dataflowKmsKey": null, - "dataflowServiceOptions": null, - "dataflowWorkerJar": null, - "defaultEnvironmentConfig": null, - "defaultEnvironmentType": null, - "delimiter": ",", - "diskSizeGb": 0, - "enableCloudDebugger": false, - "enableStreamingEngine": false, - "environmentOptions": null, - "experiments": [ - "disable_runner_v2_reason=java_job_google_template", - "enable_always_on_exception_sampling" - ], - "filesToStage": [ - "/template/file-format-conversion/file-format-conversion.jar" - ], - "gcpTempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", - "gcsPerformanceMetrics": false, - "gcsUploadBufferSizeBytes": null, - "googleApiTrace": null, - "hotKeyLoggingEnabled": false, - "inputFileFormat": "csv", - "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", - "jobName": "dataflowflextemplatejob-${uniqueId}", - "labels": { - "goog-dataflow-provided-template-name": "file_format_conversion", - "goog-dataflow-provided-template-type": "flex" - }, - "logDetailedCsvConversionErrors": false, - "maxNumWorkers": 0, - "network": null, - "numShards": 0, - "numWorkers": 0, - "numberOfWorkerHarnessThreads": 0, - "optionsId": 0, - "outputBucket": "gs://storagebucket-${uniqueId}", - "outputFileFormat": "avro", - "outputFilePrefix": "output", - "overrideWindmillBinary": null, - "pathValidatorClass": "org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator", - "pipelineUrl": "${pipelineUrl}", - "project": "${projectId}", - "recordJfrOnGcThrashing": false, - "region": "us-central1", - "resourceHints": [], - "runner": "org.apache.beam.runners.dataflow.DataflowRunner", - "saveProfilesToGcs": null, - "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc", - "sdkContainerImage": null, - "sdkHarnessContainerImageOverrides": null, - "serviceAccount": "${projectNumber}-compute@developer.gserviceaccount.com", - "stableUniqueNames": "WARNING", - "stagerClass": "org.apache.beam.runners.dataflow.util.GcsStager", - "stagingLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging", - "streaming": false, - "subnetwork": null, - "tempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", - "templateLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging/template_launches/${jobID}/job_object", - "userAgent": "Apache_Beam_SDK_for_Java/2.41.0(JRE_11_environment)", - "workerDiskType": null, - "workerHarnessContainerImage": null, - "workerMachineType": null, - "workerRegion": null, - "zone": null - } - }, - "serviceAccountEmail": "${projectNumber}-compute@developer.gserviceaccount.com", - "shuffleMode": 2, - "tempStoragePrefix": "storage.googleapis.com/dataflow-staging-us-central1-${projectNumber}/tmp", - "userAgent": { - "removed": "simplicity" - }, - "version": { - "removed": "simplicity" - }, - "workerPools": [] - }, - "id": "000000000000000000000", - "jobMetadata": { - "removed": "simplicity" - }, - "labels": { - "goog-dataflow-provided-template-name": "file_format_conversion", - "goog-dataflow-provided-template-type": "flex" - }, - "location": "us-central1", - "name": "dataflowflextemplatejob-${uniqueId}", - "pipelineDescription": { - "removed": "simplicity" - }, - "projectId": "${projectId}", - "startTime": "2024-04-01T12:34:56.123456Z", - "steps": [], - "type": 1 -} - ---- - -GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY -Content-Type: application/json -User-Agent: kcc/controller-manager -x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} - -200 OK -Cache-Control: private -Content-Type: application/json; charset=UTF-8 -Server: ESF -Vary: Origin -Vary: X-Origin -Vary: Referer -X-Content-Type-Options: nosniff -X-Frame-Options: SAMEORIGIN -X-Xss-Protection: 0 - { "createTime": "2024-04-01T12:34:56.123456Z", "currentState": 2, @@ -580,391 +402,6 @@ X-Xss-Protection: 0 --- -PUT https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint -Content-Type: application/json -User-Agent: kcc/controller-manager -x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} - -{ - "requestedState": 5 -} - -200 OK -Cache-Control: private -Content-Type: application/json; charset=UTF-8 -Server: ESF -Vary: Origin -Vary: X-Origin -Vary: Referer -X-Content-Type-Options: nosniff -X-Frame-Options: SAMEORIGIN -X-Xss-Protection: 0 - -{ - "type": 1 -} - ---- - -GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY -Content-Type: application/json -User-Agent: kcc/controller-manager -x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} - -200 OK -Cache-Control: private -Content-Type: application/json; charset=UTF-8 -Server: ESF -Vary: Origin -Vary: X-Origin -Vary: Referer -X-Content-Type-Options: nosniff -X-Frame-Options: SAMEORIGIN -X-Xss-Protection: 0 - -{ - "createTime": "2024-04-01T12:34:56.123456Z", - "currentState": 10, - "currentStateTime": "2024-04-01T12:34:56.123456Z", - "environment": { - "dataset": "bigquery.googleapis.com/cloud_dataflow", - "experiments": [ - "auto_google_template_runner_v2", - "auto_high_core_runner_v2", - "auto_runner_v2_min_sdk=2.54.0", - "configure_shuffle_service_addresses_in_control_plane", - "delayed_launch", - "disable_baggins_exp", - "disable_primeflex", - "disable_runner_v2_reason=java_job_google_template", - "ek_regions=", - "enable_always_on_exception_sampling", - "enable_async_job_creation", - "enable_billing_v_1_5", - "enable_cloud_permissions_checking", - "enable_cmek_org_policy_check", - "enable_compute_default_service_account_org_policy", - "enable_data_sampling_telemetry", - "enable_dataprep_new_billing", - "enable_fnapi_multimap_side_input_bulk_read", - "enable_memory_sampler", - "enable_oom_sampler", - "enable_recommendations", - "enable_remote_image_ping", - "enable_secure_boot", - "enable_throttled_based_rescaling", - "enable_worker_cloud_monitoring_exporter", - "enable_worker_disk_cloud_monitoring", - "enable_worker_memory_cloud_monitoring", - "enable_zonal_outage_aware_routing", - "limit_preemptible_worker_pct", - "limit_resizing_by_cpu_util", - "min_sdk_version_to_reject_worker_in_different_region_than_service=2.44.0", - "override_controller_service_account", - "primeflex_slow_start_pct=5", - "primeflex_slow_start_seconds=3600", - "regional_physical_zone_separation_enabled", - "shuffle_mode=auto", - "shuffle_service_address_type=DIRECTPATH_WITH_CFE_FALLBACK", - "sideinput_io_metrics", - "use_dataflow_service_account_in_igm", - "use_e2_for_default_machine_type_worker_regions=africa-south1,europe-north2,europe-southwest1,europe-west10,europe-west12,europe-west8,europe-west9,me-central1,me-central2,me-west1,northamerica-south1,southamerica-west1,us-east10,us-east5,us-east7,us-south1,us-west8", - "use_job_admission_controller", - "use_multi_hop_delegation", - "use_templates_regional_bucket", - "use_worker_zone_chooser_by_default" - ], - "sdkPipelineOptions": { - "display_data": [], - "options": { - "apiRootUrl": "https://dataflow.googleapis.com/", - "appName": "FileFormatConversion", - "autoscalingAlgorithm": null, - "containsHeaders": false, - "credentialFactoryClass": "org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory", - "csvFileEncoding": "UTF-8", - "csvFormat": "Default", - "dataflowEndpoint": "", - "dataflowKmsKey": null, - "dataflowServiceOptions": null, - "dataflowWorkerJar": null, - "defaultEnvironmentConfig": null, - "defaultEnvironmentType": null, - "delimiter": ",", - "diskSizeGb": 0, - "enableCloudDebugger": false, - "enableStreamingEngine": false, - "environmentOptions": null, - "experiments": [ - "disable_runner_v2_reason=java_job_google_template", - "enable_always_on_exception_sampling" - ], - "filesToStage": [ - "/template/file-format-conversion/file-format-conversion.jar" - ], - "gcpTempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", - "gcsPerformanceMetrics": false, - "gcsUploadBufferSizeBytes": null, - "googleApiTrace": null, - "hotKeyLoggingEnabled": false, - "inputFileFormat": "csv", - "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", - "jobName": "dataflowflextemplatejob-${uniqueId}", - "labels": { - "goog-dataflow-provided-template-name": "file_format_conversion", - "goog-dataflow-provided-template-type": "flex" - }, - "logDetailedCsvConversionErrors": false, - "maxNumWorkers": 0, - "network": null, - "numShards": 0, - "numWorkers": 0, - "numberOfWorkerHarnessThreads": 0, - "optionsId": 0, - "outputBucket": "gs://storagebucket-${uniqueId}", - "outputFileFormat": "avro", - "outputFilePrefix": "output", - "overrideWindmillBinary": null, - "pathValidatorClass": "org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator", - "pipelineUrl": "${pipelineUrl}", - "project": "${projectId}", - "recordJfrOnGcThrashing": false, - "region": "us-central1", - "resourceHints": [], - "runner": "org.apache.beam.runners.dataflow.DataflowRunner", - "saveProfilesToGcs": null, - "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc", - "sdkContainerImage": null, - "sdkHarnessContainerImageOverrides": null, - "serviceAccount": "${projectNumber}-compute@developer.gserviceaccount.com", - "stableUniqueNames": "WARNING", - "stagerClass": "org.apache.beam.runners.dataflow.util.GcsStager", - "stagingLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging", - "streaming": false, - "subnetwork": null, - "tempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", - "templateLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging/template_launches/${jobID}/job_object", - "userAgent": "Apache_Beam_SDK_for_Java/2.41.0(JRE_11_environment)", - "workerDiskType": null, - "workerHarnessContainerImage": null, - "workerMachineType": null, - "workerRegion": null, - "zone": null - } - }, - "serviceAccountEmail": "${projectNumber}-compute@developer.gserviceaccount.com", - "shuffleMode": 2, - "tempStoragePrefix": "storage.googleapis.com/dataflow-staging-us-central1-${projectNumber}/tmp", - "userAgent": { - "removed": "simplicity" - }, - "version": { - "removed": "simplicity" - }, - "workerPools": [] - }, - "id": "000000000000000000000", - "jobMetadata": { - "removed": "simplicity" - }, - "labels": { - "goog-dataflow-provided-template-name": "file_format_conversion", - "goog-dataflow-provided-template-type": "flex" - }, - "location": "us-central1", - "name": "dataflowflextemplatejob-${uniqueId}", - "pipelineDescription": { - "removed": "simplicity" - }, - "projectId": "${projectId}", - "requestedState": 5, - "stageStates": [], - "startTime": "2024-04-01T12:34:56.123456Z", - "steps": [], - "type": 1 -} - ---- - -GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY -Content-Type: application/json -User-Agent: kcc/controller-manager -x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} - -200 OK -Cache-Control: private -Content-Type: application/json; charset=UTF-8 -Server: ESF -Vary: Origin -Vary: X-Origin -Vary: Referer -X-Content-Type-Options: nosniff -X-Frame-Options: SAMEORIGIN -X-Xss-Protection: 0 - -{ - "createTime": "2024-04-01T12:34:56.123456Z", - "currentState": 5, - "currentStateTime": "2024-04-01T12:34:56.123456Z", - "environment": { - "dataset": "bigquery.googleapis.com/cloud_dataflow", - "experiments": [ - "auto_google_template_runner_v2", - "auto_high_core_runner_v2", - "auto_runner_v2_min_sdk=2.54.0", - "configure_shuffle_service_addresses_in_control_plane", - "delayed_launch", - "disable_baggins_exp", - "disable_primeflex", - "disable_runner_v2_reason=java_job_google_template", - "ek_regions=", - "enable_always_on_exception_sampling", - "enable_async_job_creation", - "enable_billing_v_1_5", - "enable_cloud_permissions_checking", - "enable_cmek_org_policy_check", - "enable_compute_default_service_account_org_policy", - "enable_data_sampling_telemetry", - "enable_dataprep_new_billing", - "enable_fnapi_multimap_side_input_bulk_read", - "enable_memory_sampler", - "enable_oom_sampler", - "enable_recommendations", - "enable_remote_image_ping", - "enable_secure_boot", - "enable_throttled_based_rescaling", - "enable_worker_cloud_monitoring_exporter", - "enable_worker_disk_cloud_monitoring", - "enable_worker_memory_cloud_monitoring", - "enable_zonal_outage_aware_routing", - "limit_preemptible_worker_pct", - "limit_resizing_by_cpu_util", - "min_sdk_version_to_reject_worker_in_different_region_than_service=2.44.0", - "override_controller_service_account", - "primeflex_slow_start_pct=5", - "primeflex_slow_start_seconds=3600", - "regional_physical_zone_separation_enabled", - "shuffle_mode=auto", - "shuffle_service_address_type=DIRECTPATH_WITH_CFE_FALLBACK", - "sideinput_io_metrics", - "use_dataflow_service_account_in_igm", - "use_e2_for_default_machine_type_worker_regions=africa-south1,europe-north2,europe-southwest1,europe-west10,europe-west12,europe-west8,europe-west9,me-central1,me-central2,me-west1,northamerica-south1,southamerica-west1,us-east10,us-east5,us-east7,us-south1,us-west8", - "use_job_admission_controller", - "use_multi_hop_delegation", - "use_templates_regional_bucket", - "use_worker_zone_chooser_by_default" - ], - "sdkPipelineOptions": { - "display_data": [], - "options": { - "apiRootUrl": "https://dataflow.googleapis.com/", - "appName": "FileFormatConversion", - "autoscalingAlgorithm": null, - "containsHeaders": false, - "credentialFactoryClass": "org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory", - "csvFileEncoding": "UTF-8", - "csvFormat": "Default", - "dataflowEndpoint": "", - "dataflowKmsKey": null, - "dataflowServiceOptions": null, - "dataflowWorkerJar": null, - "defaultEnvironmentConfig": null, - "defaultEnvironmentType": null, - "delimiter": ",", - "diskSizeGb": 0, - "enableCloudDebugger": false, - "enableStreamingEngine": false, - "environmentOptions": null, - "experiments": [ - "disable_runner_v2_reason=java_job_google_template", - "enable_always_on_exception_sampling" - ], - "filesToStage": [ - "/template/file-format-conversion/file-format-conversion.jar" - ], - "gcpTempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", - "gcsPerformanceMetrics": false, - "gcsUploadBufferSizeBytes": null, - "googleApiTrace": null, - "hotKeyLoggingEnabled": false, - "inputFileFormat": "csv", - "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", - "jobName": "dataflowflextemplatejob-${uniqueId}", - "labels": { - "goog-dataflow-provided-template-name": "file_format_conversion", - "goog-dataflow-provided-template-type": "flex" - }, - "logDetailedCsvConversionErrors": false, - "maxNumWorkers": 0, - "network": null, - "numShards": 0, - "numWorkers": 0, - "numberOfWorkerHarnessThreads": 0, - "optionsId": 0, - "outputBucket": "gs://storagebucket-${uniqueId}", - "outputFileFormat": "avro", - "outputFilePrefix": "output", - "overrideWindmillBinary": null, - "pathValidatorClass": "org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator", - "pipelineUrl": "${pipelineUrl}", - "project": "${projectId}", - "recordJfrOnGcThrashing": false, - "region": "us-central1", - "resourceHints": [], - "runner": "org.apache.beam.runners.dataflow.DataflowRunner", - "saveProfilesToGcs": null, - "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc", - "sdkContainerImage": null, - "sdkHarnessContainerImageOverrides": null, - "serviceAccount": "${projectNumber}-compute@developer.gserviceaccount.com", - "stableUniqueNames": "WARNING", - "stagerClass": "org.apache.beam.runners.dataflow.util.GcsStager", - "stagingLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging", - "streaming": false, - "subnetwork": null, - "tempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", - "templateLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging/template_launches/${jobID}/job_object", - "userAgent": "Apache_Beam_SDK_for_Java/2.41.0(JRE_11_environment)", - "workerDiskType": null, - "workerHarnessContainerImage": null, - "workerMachineType": null, - "workerRegion": null, - "zone": null - } - }, - "serviceAccountEmail": "${projectNumber}-compute@developer.gserviceaccount.com", - "shuffleMode": 2, - "tempStoragePrefix": "storage.googleapis.com/dataflow-staging-us-central1-${projectNumber}/tmp", - "userAgent": { - "removed": "simplicity" - }, - "version": { - "removed": "simplicity" - }, - "workerPools": [] - }, - "id": "000000000000000000000", - "jobMetadata": { - "removed": "simplicity" - }, - "labels": { - "goog-dataflow-provided-template-name": "file_format_conversion", - "goog-dataflow-provided-template-type": "flex" - }, - "location": "us-central1", - "name": "dataflowflextemplatejob-${uniqueId}", - "pipelineDescription": { - "removed": "simplicity" - }, - "projectId": "${projectId}", - "stageStates": [], - "startTime": "2024-04-01T12:34:56.123456Z", - "steps": [], - "type": 1 -} - ---- - GET https://storage.googleapis.com/storage/v1/b/storagebucket-${uniqueId}?alt=json&prettyPrint=false User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terraform-Plugin-SDK/2.10.1 terraform-provider-google-beta/kcc/controller-manager diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_generated_object_streamingdataflowflextemplatejob.golden.yaml b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_generated_object_streamingdataflowflextemplatejob.golden.yaml index 2aa52761319..34ac5728832 100644 --- a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_generated_object_streamingdataflowflextemplatejob.golden.yaml +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_generated_object_streamingdataflowflextemplatejob.golden.yaml @@ -3,7 +3,7 @@ kind: DataflowFlexTemplateJob metadata: annotations: cnrm.cloud.google.com/management-conflict-prevention-policy: none - cnrm.cloud.google.com/mutable-but-unreadable-fields: '{"spec":{"containerSpecGcsPath":"gs://dataflow-templates/2020-08-31-00_RC00/flex/PubSub_Avro_to_BigQuery","parameters":{"createDisposition":"CREATE_NEVER","inputSubscription":"projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId}","outputTableSpec":"${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId}","outputTopic":"projects/${projectId}/topics/pubsubtopic1-${uniqueId}","schemaPath":"gs://config-connector-samples/dataflowflextemplate/numbers.avsc"}}}' + cnrm.cloud.google.com/mutable-but-unreadable-fields: '{"spec":{"containerSpecGcsPath":"gs://dataflow-templates/2020-08-31-00_RC00/flex/PubSub_Avro_to_BigQuery","parameters":{"createDisposition":"CREATE_NEVER","inputSubscription":"projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId}","outputTableSpec":"${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId}","outputTopic":"projects/${projectId}/topics/pubsubtopic0-${uniqueId}","schemaPath":"gs://config-connector-samples/dataflowflextemplate/numbers.avsc"}}}' cnrm.cloud.google.com/on-delete: cancel cnrm.cloud.google.com/project-id: ${projectId} cnrm.cloud.google.com/state-into-spec: merge @@ -26,7 +26,7 @@ spec: createDisposition: CREATE_NEVER inputSubscription: projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId} outputTableSpec: ${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId} - outputTopic: projects/${projectId}/topics/pubsubtopic1-${uniqueId} + outputTopic: projects/${projectId}/topics/pubsubtopic0-${uniqueId} schemaPath: gs://config-connector-samples/dataflowflextemplate/numbers.avsc region: us-central1 serviceAccountEmailRef: diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_http.log b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_http.log index ac845b9f507..d63f534bb0a 100644 --- a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_http.log +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/_http.log @@ -582,7 +582,7 @@ User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terr "createDisposition": "CREATE_NEVER", "inputSubscription": "projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId}", "outputTableSpec": "${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId}", - "outputTopic": "projects/${projectId}/topics/pubsubtopic1-${uniqueId}", + "outputTopic": "projects/${projectId}/topics/pubsubtopic0-${uniqueId}", "schemaPath": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc" } } diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/create.yaml b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/create.yaml index f3073855b89..ad3c60f7b7b 100644 --- a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/create.yaml +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/streamingdataflowflextemplatejob/create.yaml @@ -28,6 +28,6 @@ spec: # This is maintained by us. schemaPath: gs://config-connector-samples/dataflowflextemplate/numbers.avsc inputSubscription: projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId} - outputTopic: projects/${projectId}/topics/pubsubtopic1-${uniqueId} + outputTopic: projects/${projectId}/topics/pubsubtopic0-${uniqueId} outputTableSpec: ${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId} createDisposition: CREATE_NEVER