From 27abe69cdb552b0812a5af9ff9d225e903550347 Mon Sep 17 00:00:00 2001 From: Steph Date: Tue, 12 Apr 2022 17:05:44 +0200 Subject: [PATCH 1/3] add new resource azurerm_stream_analytics_job_schedule --- .../parse/streaming_job_schedule.go | 75 +++++ .../parse/streaming_job_schedule_test.go | 128 +++++++++ .../services/streamanalytics/registration.go | 5 +- .../services/streamanalytics/resourceids.go | 1 + .../stream_analytics_job_data_source.go | 33 ++- .../stream_analytics_job_data_source_test.go | 29 ++ .../stream_analytics_job_resource.go | 4 + .../stream_analytics_job_schedule_resource.go | 269 ++++++++++++++++++ ...am_analytics_job_schedule_resource_test.go | 230 +++++++++++++++ .../stream_analytics_output_blob_resource.go | 1 + ...am_analytics_stream_input_blob_resource.go | 21 +- .../streamanalytics/testdata/chonkdata.csv | 4 + .../validate/streaming_job_schedule_id.go | 23 ++ .../streaming_job_schedule_id_test.go | 88 ++++++ .../docs/d/stream_analytics_job.html.markdown | 12 +- ...tream_analytics_job_schedule.html.markdown | 83 ++++++ 16 files changed, 987 insertions(+), 19 deletions(-) create mode 100644 internal/services/streamanalytics/parse/streaming_job_schedule.go create mode 100644 internal/services/streamanalytics/parse/streaming_job_schedule_test.go create mode 100644 internal/services/streamanalytics/stream_analytics_job_schedule_resource.go create mode 100644 internal/services/streamanalytics/stream_analytics_job_schedule_resource_test.go create mode 100644 internal/services/streamanalytics/testdata/chonkdata.csv create mode 100644 internal/services/streamanalytics/validate/streaming_job_schedule_id.go create mode 100644 internal/services/streamanalytics/validate/streaming_job_schedule_id_test.go create mode 100644 website/docs/r/stream_analytics_job_schedule.html.markdown diff --git a/internal/services/streamanalytics/parse/streaming_job_schedule.go b/internal/services/streamanalytics/parse/streaming_job_schedule.go new file mode 100644 index 000000000000..754ab8e9d6a3 --- /dev/null +++ b/internal/services/streamanalytics/parse/streaming_job_schedule.go @@ -0,0 +1,75 @@ +package parse + +// NOTE: this file is generated via 'go:generate' - manual changes will be overwritten + +import ( + "fmt" + "strings" + + "github.com/hashicorp/go-azure-helpers/resourcemanager/resourceids" +) + +type StreamingJobScheduleId struct { + SubscriptionId string + ResourceGroup string + StreamingjobName string + ScheduleName string +} + +func NewStreamingJobScheduleID(subscriptionId, resourceGroup, streamingjobName, scheduleName string) StreamingJobScheduleId { + return StreamingJobScheduleId{ + SubscriptionId: subscriptionId, + ResourceGroup: resourceGroup, + StreamingjobName: streamingjobName, + ScheduleName: scheduleName, + } +} + +func (id StreamingJobScheduleId) String() string { + segments := []string{ + fmt.Sprintf("Schedule Name %q", id.ScheduleName), + fmt.Sprintf("Streamingjob Name %q", id.StreamingjobName), + fmt.Sprintf("Resource Group %q", id.ResourceGroup), + } + segmentsStr := strings.Join(segments, " / ") + return fmt.Sprintf("%s: (%s)", "Streaming Job Schedule", segmentsStr) +} + +func (id StreamingJobScheduleId) ID() string { + fmtString := "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.StreamAnalytics/streamingjobs/%s/schedule/%s" + return fmt.Sprintf(fmtString, id.SubscriptionId, id.ResourceGroup, id.StreamingjobName, id.ScheduleName) +} + +// StreamingJobScheduleID parses a StreamingJobSchedule ID into an StreamingJobScheduleId struct +func StreamingJobScheduleID(input string) (*StreamingJobScheduleId, error) { + id, err := resourceids.ParseAzureResourceID(input) + if err != nil { + return nil, err + } + + resourceId := StreamingJobScheduleId{ + SubscriptionId: id.SubscriptionID, + ResourceGroup: id.ResourceGroup, + } + + if resourceId.SubscriptionId == "" { + return nil, fmt.Errorf("ID was missing the 'subscriptions' element") + } + + if resourceId.ResourceGroup == "" { + return nil, fmt.Errorf("ID was missing the 'resourceGroups' element") + } + + if resourceId.StreamingjobName, err = id.PopSegment("streamingjobs"); err != nil { + return nil, err + } + if resourceId.ScheduleName, err = id.PopSegment("schedule"); err != nil { + return nil, err + } + + if err := id.ValidateNoEmptySegments(input); err != nil { + return nil, err + } + + return &resourceId, nil +} diff --git a/internal/services/streamanalytics/parse/streaming_job_schedule_test.go b/internal/services/streamanalytics/parse/streaming_job_schedule_test.go new file mode 100644 index 000000000000..68ddff5c21da --- /dev/null +++ b/internal/services/streamanalytics/parse/streaming_job_schedule_test.go @@ -0,0 +1,128 @@ +package parse + +// NOTE: this file is generated via 'go:generate' - manual changes will be overwritten + +import ( + "testing" + + "github.com/hashicorp/go-azure-helpers/resourcemanager/resourceids" +) + +var _ resourceids.Id = StreamingJobScheduleId{} + +func TestStreamingJobScheduleIDFormatter(t *testing.T) { + actual := NewStreamingJobScheduleID("12345678-1234-9876-4563-123456789012", "resGroup1", "streamingJob1", "default").ID() + expected := "/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/schedule/default" + if actual != expected { + t.Fatalf("Expected %q but got %q", expected, actual) + } +} + +func TestStreamingJobScheduleID(t *testing.T) { + testData := []struct { + Input string + Error bool + Expected *StreamingJobScheduleId + }{ + + { + // empty + Input: "", + Error: true, + }, + + { + // missing SubscriptionId + Input: "/", + Error: true, + }, + + { + // missing value for SubscriptionId + Input: "/subscriptions/", + Error: true, + }, + + { + // missing ResourceGroup + Input: "/subscriptions/12345678-1234-9876-4563-123456789012/", + Error: true, + }, + + { + // missing value for ResourceGroup + Input: "/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/", + Error: true, + }, + + { + // missing StreamingjobName + Input: "/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/", + Error: true, + }, + + { + // missing value for StreamingjobName + Input: "/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/", + Error: true, + }, + + { + // missing ScheduleName + Input: "/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/", + Error: true, + }, + + { + // missing value for ScheduleName + Input: "/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/schedule/", + Error: true, + }, + + { + // valid + Input: "/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/schedule/default", + Expected: &StreamingJobScheduleId{ + SubscriptionId: "12345678-1234-9876-4563-123456789012", + ResourceGroup: "resGroup1", + StreamingjobName: "streamingJob1", + ScheduleName: "default", + }, + }, + + { + // upper-cased + Input: "/SUBSCRIPTIONS/12345678-1234-9876-4563-123456789012/RESOURCEGROUPS/RESGROUP1/PROVIDERS/MICROSOFT.STREAMANALYTICS/STREAMINGJOBS/STREAMINGJOB1/SCHEDULE/DEFAULT", + Error: true, + }, + } + + for _, v := range testData { + t.Logf("[DEBUG] Testing %q", v.Input) + + actual, err := StreamingJobScheduleID(v.Input) + if err != nil { + if v.Error { + continue + } + + t.Fatalf("Expect a value but got an error: %s", err) + } + if v.Error { + t.Fatal("Expect an error but didn't get one") + } + + if actual.SubscriptionId != v.Expected.SubscriptionId { + t.Fatalf("Expected %q but got %q for SubscriptionId", v.Expected.SubscriptionId, actual.SubscriptionId) + } + if actual.ResourceGroup != v.Expected.ResourceGroup { + t.Fatalf("Expected %q but got %q for ResourceGroup", v.Expected.ResourceGroup, actual.ResourceGroup) + } + if actual.StreamingjobName != v.Expected.StreamingjobName { + t.Fatalf("Expected %q but got %q for StreamingjobName", v.Expected.StreamingjobName, actual.StreamingjobName) + } + if actual.ScheduleName != v.Expected.ScheduleName { + t.Fatalf("Expected %q but got %q for ScheduleName", v.Expected.ScheduleName, actual.ScheduleName) + } + } +} diff --git a/internal/services/streamanalytics/registration.go b/internal/services/streamanalytics/registration.go index 5cb9684327c1..6aaf249b5367 100644 --- a/internal/services/streamanalytics/registration.go +++ b/internal/services/streamanalytics/registration.go @@ -22,10 +22,11 @@ func (r Registration) DataSources() []sdk.DataSource { func (r Registration) Resources() []sdk.Resource { return []sdk.Resource{ - OutputFunctionResource{}, - OutputTableResource{}, ClusterResource{}, + JobScheduleResource{}, ManagedPrivateEndpointResource{}, + OutputFunctionResource{}, + OutputTableResource{}, } } diff --git a/internal/services/streamanalytics/resourceids.go b/internal/services/streamanalytics/resourceids.go index 55ff29dbbfe8..82d9f3b0e882 100644 --- a/internal/services/streamanalytics/resourceids.go +++ b/internal/services/streamanalytics/resourceids.go @@ -2,6 +2,7 @@ package streamanalytics //go:generate go run ../../tools/generator-resource-id/main.go -path=./ -name=Function -id=/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/functions/function1 //go:generate go run ../../tools/generator-resource-id/main.go -path=./ -name=StreamingJob -id=/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1 +//go:generate go run ../../tools/generator-resource-id/main.go -path=./ -name=StreamingJobSchedule -id=/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/schedule/default //go:generate go run ../../tools/generator-resource-id/main.go -path=./ -name=StreamInput -id=/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/inputs/streamInput1 //go:generate go run ../../tools/generator-resource-id/main.go -path=./ -name=Output -id=/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/streamingjobs/streamingJob1/outputs/output1 //go:generate go run ../../tools/generator-resource-id/main.go -path=./ -name=Cluster -id=/subscriptions/12345678-1234-9876-4563-123456789012/resourceGroups/resGroup1/providers/Microsoft.StreamAnalytics/clusters/cluster1 diff --git a/internal/services/streamanalytics/stream_analytics_job_data_source.go b/internal/services/streamanalytics/stream_analytics_job_data_source.go index 75fcb886a2e9..cbda48dfa6f3 100644 --- a/internal/services/streamanalytics/stream_analytics_job_data_source.go +++ b/internal/services/streamanalytics/stream_analytics_job_data_source.go @@ -63,11 +63,26 @@ func dataSourceStreamAnalyticsJob() *pluginsdk.Resource { "identity": commonschema.SystemAssignedIdentityComputed(), + "last_output_time": { + Type: pluginsdk.TypeString, + Computed: true, + }, + "output_error_policy": { Type: pluginsdk.TypeString, Computed: true, }, + "start_mode": { + Type: pluginsdk.TypeString, + Computed: true, + }, + + "start_time": { + Type: pluginsdk.TypeString, + Computed: true, + }, + "streaming_units": { Type: pluginsdk.TypeInt, Computed: true, @@ -110,16 +125,26 @@ func dataSourceStreamAnalyticsJobRead(d *pluginsdk.ResourceData, meta interface{ if props := resp.StreamingJobProperties; props != nil { d.Set("compatibility_level", string(props.CompatibilityLevel)) d.Set("data_locale", props.DataLocale) - if props.EventsLateArrivalMaxDelayInSeconds != nil { - d.Set("events_late_arrival_max_delay_in_seconds", int(*props.EventsLateArrivalMaxDelayInSeconds)) + if v := props.EventsLateArrivalMaxDelayInSeconds; v != nil { + d.Set("events_late_arrival_max_delay_in_seconds", int(*v)) } - if props.EventsOutOfOrderMaxDelayInSeconds != nil { - d.Set("events_out_of_order_max_delay_in_seconds", int(*props.EventsOutOfOrderMaxDelayInSeconds)) + if v := props.EventsOutOfOrderMaxDelayInSeconds; v != nil { + d.Set("events_out_of_order_max_delay_in_seconds", int(*v)) } d.Set("events_out_of_order_policy", string(props.EventsOutOfOrderPolicy)) d.Set("job_id", props.JobID) d.Set("output_error_policy", string(props.OutputErrorPolicy)) + if v := props.LastOutputEventTime; v != nil { + d.Set("last_output_time", v.String()) + } + + if v := props.OutputStartTime; v != nil { + d.Set("start_time", v.String()) + } + + d.Set("start_mode", props.OutputStartMode) + if props.Transformation != nil && props.Transformation.TransformationProperties != nil { d.Set("streaming_units", props.Transformation.TransformationProperties.StreamingUnits) d.Set("transformation_query", props.Transformation.TransformationProperties.Query) diff --git a/internal/services/streamanalytics/stream_analytics_job_data_source_test.go b/internal/services/streamanalytics/stream_analytics_job_data_source_test.go index 5e0e1db27249..0ad22df99e91 100644 --- a/internal/services/streamanalytics/stream_analytics_job_data_source_test.go +++ b/internal/services/streamanalytics/stream_analytics_job_data_source_test.go @@ -42,6 +42,21 @@ func TestAccDataSourceStreamAnalyticsJob_identity(t *testing.T) { }) } +func TestAccDataSourceStreamAnalyticsJob_jobSchedule(t *testing.T) { + data := acceptance.BuildTestData(t, "data.azurerm_stream_analytics_job", "test") + + data.DataSourceTest(t, []acceptance.TestStep{ + { + Config: StreamAnalyticsJobDataSource{}.jobSchedule(data), + Check: acceptance.ComposeTestCheckFunc( + check.That(data.ResourceName).Key("start_mode").Exists(), + check.That(data.ResourceName).Key("start_time").Exists(), + check.That(data.ResourceName).Key("last_output_time").Exists(), + ), + }, + }) +} + func (d StreamAnalyticsJobDataSource) basic(data acceptance.TestData) string { config := StreamAnalyticsJobResource{}.basic(data) return fmt.Sprintf(` @@ -65,3 +80,17 @@ data "azurerm_stream_analytics_job" "test" { } `, config) } + +func (d StreamAnalyticsJobDataSource) jobSchedule(data acceptance.TestData) string { + config := StreamAnalyticsJobScheduleResource{}.lastOutputEventTime(data) + return fmt.Sprintf(` +%s + +data "azurerm_stream_analytics_job" "test" { + name = azurerm_stream_analytics_job.test.name + resource_group_name = azurerm_stream_analytics_job.test.resource_group_name + + depends_on = [azurerm_stream_analytics_job_schedule.test] +} +`, config) +} diff --git a/internal/services/streamanalytics/stream_analytics_job_resource.go b/internal/services/streamanalytics/stream_analytics_job_resource.go index d00f3cb6af7d..dcd6c7f9fcce 100644 --- a/internal/services/streamanalytics/stream_analytics_job_resource.go +++ b/internal/services/streamanalytics/stream_analytics_job_resource.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/terraform-provider-azurerm/helpers/azure" "github.com/hashicorp/terraform-provider-azurerm/helpers/tf" "github.com/hashicorp/terraform-provider-azurerm/internal/clients" + "github.com/hashicorp/terraform-provider-azurerm/internal/locks" "github.com/hashicorp/terraform-provider-azurerm/internal/services/streamanalytics/parse" "github.com/hashicorp/terraform-provider-azurerm/internal/services/streamanalytics/validate" "github.com/hashicorp/terraform-provider-azurerm/internal/tags" @@ -146,6 +147,9 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte id := parse.NewStreamingJobID(subscriptionId, d.Get("resource_group_name").(string), d.Get("name").(string)) + locks.ByID(id.ID()) + defer locks.UnlockByID(id.ID()) + if d.IsNewResource() { existing, err := client.Get(ctx, id.ResourceGroup, id.Name, "") if err != nil { diff --git a/internal/services/streamanalytics/stream_analytics_job_schedule_resource.go b/internal/services/streamanalytics/stream_analytics_job_schedule_resource.go new file mode 100644 index 000000000000..392dfe92134a --- /dev/null +++ b/internal/services/streamanalytics/stream_analytics_job_schedule_resource.go @@ -0,0 +1,269 @@ +package streamanalytics + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/Azure/azure-sdk-for-go/services/streamanalytics/mgmt/2020-03-01/streamanalytics" + "github.com/Azure/go-autorest/autorest/date" + "github.com/hashicorp/terraform-provider-azurerm/helpers/validate" + "github.com/hashicorp/terraform-provider-azurerm/internal/locks" + "github.com/hashicorp/terraform-provider-azurerm/internal/sdk" + "github.com/hashicorp/terraform-provider-azurerm/internal/services/streamanalytics/parse" + streamAnalyticsValidate "github.com/hashicorp/terraform-provider-azurerm/internal/services/streamanalytics/validate" + "github.com/hashicorp/terraform-provider-azurerm/internal/tf/pluginsdk" + "github.com/hashicorp/terraform-provider-azurerm/internal/tf/validation" + "github.com/hashicorp/terraform-provider-azurerm/utils" +) + +type JobScheduleResource struct{} + +type JobScheduleResourceModel struct { + StreamAnalyticsJob string `tfschema:"stream_analytics_job_id"` + StartMode string `tfschema:"start_mode"` + StartTime string `tfschema:"start_time"` + LastOutputTime string `tfschema:"last_output_time"` +} + +func (r JobScheduleResource) Arguments() map[string]*pluginsdk.Schema { + return map[string]*pluginsdk.Schema{ + "stream_analytics_job_id": { + Type: pluginsdk.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: streamAnalyticsValidate.StreamingJobID, + }, + + "start_mode": { + Type: pluginsdk.TypeString, + Required: true, + ValidateFunc: validation.StringInSlice([]string{ + string(streamanalytics.OutputStartModeCustomTime), + string(streamanalytics.OutputStartModeJobStartTime), + string(streamanalytics.OutputStartModeLastOutputEventTime), + }, false), + }, + + "start_time": { + Type: pluginsdk.TypeString, + Optional: true, + Computed: true, + ValidateFunc: validate.ISO8601DateTime, + }, + } +} + +func (r JobScheduleResource) Attributes() map[string]*pluginsdk.Schema { + return map[string]*pluginsdk.Schema{ + "last_output_time": { + Type: pluginsdk.TypeString, + Computed: true, + }, + } +} + +func (r JobScheduleResource) ModelObject() interface{} { + return &JobScheduleResourceModel{} +} + +func (r JobScheduleResource) ResourceType() string { + return "azurerm_stream_analytics_job_schedule" +} + +func (r JobScheduleResource) IDValidationFunc() pluginsdk.SchemaValidateFunc { + return streamAnalyticsValidate.StreamingJobScheduleID +} + +func (r JobScheduleResource) Create() sdk.ResourceFunc { + return sdk.ResourceFunc{ + Timeout: 30 * time.Minute, + Func: func(ctx context.Context, metadata sdk.ResourceMetaData) error { + var model JobScheduleResourceModel + if err := metadata.Decode(&model); err != nil { + return err + } + + client := metadata.Client.StreamAnalytics.JobsClient + streamAnalyticsId, err := parse.StreamingJobID(model.StreamAnalyticsJob) + if err != nil { + return err + } + + // This is a virtual resource so the last segment is hardcoded + id := parse.NewStreamingJobScheduleID(streamAnalyticsId.SubscriptionId, streamAnalyticsId.ResourceGroup, streamAnalyticsId.Name, "default") + + locks.ByID(id.ID()) + defer locks.UnlockByID(id.ID()) + + existing, err := client.Get(ctx, id.ResourceGroup, id.StreamingjobName, "") + if err != nil && !utils.ResponseWasNotFound(existing.Response) { + return fmt.Errorf("checking for presence of existing %s: %+v", id, err) + } + + outputStartMode := streamanalytics.OutputStartMode(model.StartMode) + if outputStartMode == streamanalytics.OutputStartModeLastOutputEventTime { + if v := existing.StreamingJobProperties.LastOutputEventTime; v == nil { + return fmt.Errorf("`start_mode` can only be set to `LastOutputEventTime` if this job was previously started") + } + } + + props := &streamanalytics.StartStreamingJobParameters{ + OutputStartMode: outputStartMode, + } + + if outputStartMode == streamanalytics.OutputStartModeCustomTime { + if model.StartTime == "" { + return fmt.Errorf("`start_time` must be specified is `start_mode` is set to `CustomTime`") + } else { + startTime, _ := date.ParseTime(time.RFC3339, model.StartTime) + log.Printf("sa_test time: %s", startTime) + outputStartTime := &date.Time{ + Time: startTime, + } + props.OutputStartTime = outputStartTime + } + } + + future, err := client.Start(ctx, id.ResourceGroup, id.StreamingjobName, props) + if err != nil { + return fmt.Errorf("creating %s: %+v", id, err) + } + + if err = future.WaitForCompletionRef(ctx, client.Client); err != nil { + return fmt.Errorf("waiting on create/update of %s: %+v", id, err) + } + + metadata.SetID(id) + + return nil + }, + } +} + +func (r JobScheduleResource) Read() sdk.ResourceFunc { + return sdk.ResourceFunc{ + Timeout: 5 * time.Minute, + Func: func(ctx context.Context, metadata sdk.ResourceMetaData) error { + client := metadata.Client.StreamAnalytics.JobsClient + id, err := parse.StreamingJobScheduleID(metadata.ResourceData.Id()) + if err != nil { + return err + } + + streamAnalyticsId := parse.NewStreamingJobID(id.SubscriptionId, id.ResourceGroup, id.StreamingjobName) + + resp, err := client.Get(ctx, id.ResourceGroup, id.StreamingjobName, "") + if err != nil { + if utils.ResponseWasNotFound(resp.Response) { + return metadata.MarkAsGone(id) + } + return fmt.Errorf("reading %s: %+v", *id, err) + } + + if props := resp.StreamingJobProperties; props != nil { + startTime := "" + if v := props.OutputStartTime; v != nil { + startTime = v.String() + } + + lastOutputTime := "" + if v := props.LastOutputEventTime; v != nil { + lastOutputTime = v.String() + } + + state := JobScheduleResourceModel{ + StreamAnalyticsJob: streamAnalyticsId.ID(), + StartMode: string(props.OutputStartMode), + StartTime: startTime, + LastOutputTime: lastOutputTime, + } + + return metadata.Encode(&state) + } + + return nil + }, + } +} + +func (r JobScheduleResource) Update() sdk.ResourceFunc { + return sdk.ResourceFunc{ + Timeout: 30 * time.Minute, + Func: func(ctx context.Context, metadata sdk.ResourceMetaData) error { + client := metadata.Client.StreamAnalytics.JobsClient + id, err := parse.StreamingJobScheduleID(metadata.ResourceData.Id()) + if err != nil { + return err + } + + var state JobScheduleResourceModel + if err := metadata.Decode(&state); err != nil { + return fmt.Errorf("decoding: %+v", err) + } + + if metadata.ResourceData.HasChanges("start_mode", "start_time") { + outputStartMode := streamanalytics.OutputStartMode(state.StartMode) + startTime, _ := date.ParseTime(time.RFC3339, state.StartTime) + outputStartTime := &date.Time{ + Time: startTime, + } + + props := &streamanalytics.StartStreamingJobParameters{ + OutputStartMode: outputStartMode, + } + + if outputStartMode == streamanalytics.OutputStartModeCustomTime { + props.OutputStartTime = outputStartTime + } + + existing, err := client.Get(ctx, id.ResourceGroup, id.StreamingjobName, "") + if err != nil { + return fmt.Errorf("retrieving %s: %+v", *id, err) + } + + if v := existing.StreamingJobProperties; v != nil && v.JobState != nil && *v.JobState == "Running" { + future, err := client.Stop(ctx, id.ResourceGroup, id.StreamingjobName) + if err != nil { + return err + } + if err := future.WaitForCompletionRef(ctx, client.Client); err != nil { + return fmt.Errorf("waiting for %s to stop: %+v", *id, err) + } + } + + if _, err = client.Start(ctx, id.ResourceGroup, id.StreamingjobName, props); err != nil { + return fmt.Errorf("updating %s: %+v", *id, err) + } + } + + return nil + }, + } +} + +func (r JobScheduleResource) Delete() sdk.ResourceFunc { + return sdk.ResourceFunc{ + Timeout: 30 * time.Minute, + Func: func(ctx context.Context, metadata sdk.ResourceMetaData) error { + client := metadata.Client.StreamAnalytics.JobsClient + id, err := parse.StreamingJobScheduleID(metadata.ResourceData.Id()) + if err != nil { + return err + } + + metadata.Logger.Infof("deleting %s", *id) + + future, err := client.Stop(ctx, id.ResourceGroup, id.StreamingjobName) + if err != nil { + return fmt.Errorf("deleting %s: %+v", *id, err) + } + + if err = future.WaitForCompletionRef(ctx, client.Client); err != nil { + return fmt.Errorf("waiting on deletion of %s: %+v", id, err) + } + return nil + }, + } +} diff --git a/internal/services/streamanalytics/stream_analytics_job_schedule_resource_test.go b/internal/services/streamanalytics/stream_analytics_job_schedule_resource_test.go new file mode 100644 index 000000000000..bbcc6231d1a9 --- /dev/null +++ b/internal/services/streamanalytics/stream_analytics_job_schedule_resource_test.go @@ -0,0 +1,230 @@ +package streamanalytics_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/terraform-provider-azurerm/internal/acceptance" + "github.com/hashicorp/terraform-provider-azurerm/internal/acceptance/check" + "github.com/hashicorp/terraform-provider-azurerm/internal/clients" + "github.com/hashicorp/terraform-provider-azurerm/internal/services/streamanalytics/parse" + "github.com/hashicorp/terraform-provider-azurerm/internal/tf/pluginsdk" + "github.com/hashicorp/terraform-provider-azurerm/utils" +) + +type StreamAnalyticsJobScheduleResource struct{} + +func TestAccStreamAnalyticsJobSchedule_basic(t *testing.T) { + data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job_schedule", "test") + r := StreamAnalyticsJobScheduleResource{} + + data.ResourceTest(t, r, []acceptance.TestStep{ + { + Config: r.basic(data), + Check: acceptance.ComposeTestCheckFunc( + check.That(data.ResourceName).ExistsInAzure(r), + ), + }, + data.ImportStep(), + }) +} + +func TestAccStreamAnalyticsJobSchedule_customTime(t *testing.T) { + data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job_schedule", "test") + r := StreamAnalyticsJobScheduleResource{} + + data.ResourceTest(t, r, []acceptance.TestStep{ + { + Config: r.customTime(data), + Check: acceptance.ComposeTestCheckFunc( + check.That(data.ResourceName).ExistsInAzure(r), + ), + }, + data.ImportStep(), + }) +} + +func TestAccStreamAnalyticsJobSchedule_lastOutputEventTime(t *testing.T) { + data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job_schedule", "test") + r := StreamAnalyticsJobScheduleResource{} + + data.ResourceTest(t, r, []acceptance.TestStep{ + { + Config: r.basic(data), + Check: acceptance.ComposeTestCheckFunc( + check.That(data.ResourceName).ExistsInAzure(r), + ), + }, + data.ImportStep(), + { + Config: r.lastOutputEventTime(data), + Check: acceptance.ComposeTestCheckFunc( + check.That(data.ResourceName).ExistsInAzure(r), + ), + }, + data.ImportStep(), + }) +} + +func (r StreamAnalyticsJobScheduleResource) Exists(ctx context.Context, client *clients.Client, state *pluginsdk.InstanceState) (*bool, error) { + id, err := parse.StreamingJobScheduleID(state.ID) + if err != nil { + return nil, err + } + + resp, err := client.StreamAnalytics.JobsClient.Get(ctx, id.ResourceGroup, id.StreamingjobName, "") + if err != nil { + if utils.ResponseWasNotFound(resp.Response) { + return utils.Bool(false), err + } + return nil, fmt.Errorf("retrieving %s: %+v", *id, err) + } + return utils.Bool(resp.StreamingJobProperties != nil && resp.StreamingJobProperties.OutputStartTime != nil), nil +} + +func (r StreamAnalyticsJobScheduleResource) basic(data acceptance.TestData) string { + return fmt.Sprintf(` +%s + +resource "azurerm_stream_analytics_job_schedule" "test" { + stream_analytics_job_id = azurerm_stream_analytics_job.test.id + start_mode = "JobStartTime" + + depends_on = [ + azurerm_stream_analytics_job.test, + azurerm_stream_analytics_stream_input_blob.test, + azurerm_stream_analytics_output_blob.test, + ] +} +`, r.template(data)) +} + +func (r StreamAnalyticsJobScheduleResource) customTime(data acceptance.TestData) string { + utcNow := time.Now().UTC() + startDate := time.Date(utcNow.Year(), utcNow.Month(), 1, 0, 0, 0, 0, utcNow.Location()) + + return fmt.Sprintf(` +%s + +resource "azurerm_stream_analytics_job_schedule" "test" { + stream_analytics_job_id = azurerm_stream_analytics_job.test.id + start_mode = "CustomTime" + start_time = "%s" + + depends_on = [ + azurerm_stream_analytics_job.test, + azurerm_stream_analytics_stream_input_blob.test, + azurerm_stream_analytics_output_blob.test, + ] +} +`, r.template(data), startDate.Format(time.RFC3339)) +} + +func (r StreamAnalyticsJobScheduleResource) lastOutputEventTime(data acceptance.TestData) string { + return fmt.Sprintf(` +%s + +resource "azurerm_stream_analytics_job_schedule" "test" { + stream_analytics_job_id = azurerm_stream_analytics_job.test.id + start_mode = "LastOutputEventTime" + + depends_on = [ + azurerm_stream_analytics_job.test, + azurerm_stream_analytics_stream_input_blob.test, + azurerm_stream_analytics_output_blob.test, + ] +} + + +`, r.template(data)) +} + +func (r StreamAnalyticsJobScheduleResource) template(data acceptance.TestData) string { + return fmt.Sprintf(` +provider "azurerm" { + features {} +} + +resource "azurerm_resource_group" "test" { + name = "acctestRG-%[1]d" + location = "%[2]s" +} + +resource "azurerm_storage_account" "test" { + name = "acctestsa%[3]s" + resource_group_name = azurerm_resource_group.test.name + location = azurerm_resource_group.test.location + account_tier = "Standard" + account_replication_type = "LRS" +} + +resource "azurerm_storage_container" "test" { + name = "chonks" + storage_account_name = azurerm_storage_account.test.name + container_access_type = "private" +} + +resource "azurerm_storage_blob" "test" { + name = "chonkdata" + storage_account_name = azurerm_storage_account.test.name + storage_container_name = azurerm_storage_container.test.name + type = "Block" + source = "testdata/chonkdata.csv" +} + +resource "azurerm_stream_analytics_job" "test" { + name = "acctestjob-%[1]d" + resource_group_name = azurerm_resource_group.test.name + location = azurerm_resource_group.test.location + data_locale = "en-GB" + compatibility_level = "1.1" + events_late_arrival_max_delay_in_seconds = 10 + events_out_of_order_max_delay_in_seconds = 20 + events_out_of_order_policy = "Drop" + output_error_policy = "Stop" + streaming_units = 6 + + transformation_query = < **Note:** Setting `start_mode` to `LastOutputEventTime` is only possible if the job had been previously started and produced output. + +* `start_time` - (Optional) The time in ISO8601 format at which the Stream Analytics Job should be started e.g. `2022-04-01T00:00:00Z`. This property can only be specified if `start_mode` is set to `CustomTime` + +## Attributes Reference + +The following attributes are exported in addition to the arguments listed above: + +* `id` - The ID of the Stream Analytics Job. + +* `last_output_time` - The time at which the Stream Analytics job last produced an output. + +--- + +## Timeouts + +The `timeouts` block allows you to specify [timeouts](https://www.terraform.io/docs/configuration/resources.html#timeouts) for certain actions: + +* `create` - (Defaults to 30 minutes) Used when creating the Stream Analytics Job. +* `update` - (Defaults to 30 minutes) Used when updating the Stream Analytics Job. +* `read` - (Defaults to 5 minutes) Used when retrieving the Stream Analytics Job. +* `delete` - (Defaults to 30 minutes) Used when deleting the Stream Analytics Job. + +## Import + +Stream Analytics Job's can be imported using the `resource id`, e.g. + +```shell +terraform import azurerm_stream_analytics_job_schedule.example /subscriptions/00000000-0000-0000-0000-000000000000/resourcegroups/group1/providers/Microsoft.StreamAnalytics/streamingjobs/job1/schedule/default +``` From a5515701275409fb0ea93d2e45eeb7b39ade8f7a Mon Sep 17 00:00:00 2001 From: Steph Date: Tue, 12 Apr 2022 17:33:54 +0200 Subject: [PATCH 2/3] fix typos and update test --- .../stream_analytics_job_data_source_test.go | 3 +-- .../stream_analytics_job_schedule_resource.go | 10 ++++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/internal/services/streamanalytics/stream_analytics_job_data_source_test.go b/internal/services/streamanalytics/stream_analytics_job_data_source_test.go index 0ad22df99e91..30ccd87ba85e 100644 --- a/internal/services/streamanalytics/stream_analytics_job_data_source_test.go +++ b/internal/services/streamanalytics/stream_analytics_job_data_source_test.go @@ -51,7 +51,6 @@ func TestAccDataSourceStreamAnalyticsJob_jobSchedule(t *testing.T) { Check: acceptance.ComposeTestCheckFunc( check.That(data.ResourceName).Key("start_mode").Exists(), check.That(data.ResourceName).Key("start_time").Exists(), - check.That(data.ResourceName).Key("last_output_time").Exists(), ), }, }) @@ -82,7 +81,7 @@ data "azurerm_stream_analytics_job" "test" { } func (d StreamAnalyticsJobDataSource) jobSchedule(data acceptance.TestData) string { - config := StreamAnalyticsJobScheduleResource{}.lastOutputEventTime(data) + config := StreamAnalyticsJobScheduleResource{}.customTime(data) return fmt.Sprintf(` %s diff --git a/internal/services/streamanalytics/stream_analytics_job_schedule_resource.go b/internal/services/streamanalytics/stream_analytics_job_schedule_resource.go index 392dfe92134a..70bee3f2e189 100644 --- a/internal/services/streamanalytics/stream_analytics_job_schedule_resource.go +++ b/internal/services/streamanalytics/stream_analytics_job_schedule_resource.go @@ -3,7 +3,6 @@ package streamanalytics import ( "context" "fmt" - "log" "time" "github.com/Azure/azure-sdk-for-go/services/streamanalytics/mgmt/2020-03-01/streamanalytics" @@ -115,10 +114,9 @@ func (r JobScheduleResource) Create() sdk.ResourceFunc { if outputStartMode == streamanalytics.OutputStartModeCustomTime { if model.StartTime == "" { - return fmt.Errorf("`start_time` must be specified is `start_mode` is set to `CustomTime`") + return fmt.Errorf("`start_time` must be specified if `start_mode` is set to `CustomTime`") } else { startTime, _ := date.ParseTime(time.RFC3339, model.StartTime) - log.Printf("sa_test time: %s", startTime) outputStartTime := &date.Time{ Time: startTime, } @@ -132,7 +130,7 @@ func (r JobScheduleResource) Create() sdk.ResourceFunc { } if err = future.WaitForCompletionRef(ctx, client.Client); err != nil { - return fmt.Errorf("waiting on create/update of %s: %+v", id, err) + return fmt.Errorf("waiting on create/update for %s: %+v", id, err) } metadata.SetID(id) @@ -159,7 +157,7 @@ func (r JobScheduleResource) Read() sdk.ResourceFunc { if utils.ResponseWasNotFound(resp.Response) { return metadata.MarkAsGone(id) } - return fmt.Errorf("reading %s: %+v", *id, err) + return fmt.Errorf("retrieving %s: %+v", *id, err) } if props := resp.StreamingJobProperties; props != nil { @@ -261,7 +259,7 @@ func (r JobScheduleResource) Delete() sdk.ResourceFunc { } if err = future.WaitForCompletionRef(ctx, client.Client); err != nil { - return fmt.Errorf("waiting on deletion of %s: %+v", id, err) + return fmt.Errorf("waiting for deletion of %s: %+v", id, err) } return nil }, From 23a50448898e04c3f50c795ab2965e13f7e6bbf2 Mon Sep 17 00:00:00 2001 From: Steph Date: Tue, 12 Apr 2022 18:11:21 +0200 Subject: [PATCH 3/3] flesh out example --- ...tream_analytics_job_schedule.html.markdown | 71 ++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/website/docs/r/stream_analytics_job_schedule.html.markdown b/website/docs/r/stream_analytics_job_schedule.html.markdown index 7348ab2e0789..91a731e80c20 100644 --- a/website/docs/r/stream_analytics_job_schedule.html.markdown +++ b/website/docs/r/stream_analytics_job_schedule.html.markdown @@ -18,6 +18,28 @@ resource "azurerm_resource_group" "example" { location = "West Europe" } +resource "azurerm_storage_account" "example" { + name = "example" + resource_group_name = azurerm_resource_group.example.name + location = azurerm_resource_group.example.location + account_tier = "Standard" + account_replication_type = "LRS" +} + +resource "azurerm_storage_container" "example" { + name = "example" + storage_account_name = azurerm_storage_account.example.name + container_access_type = "private" +} + +resource "azurerm_storage_blob" "example" { + name = "example" + storage_account_name = azurerm_storage_account.example.name + storage_container_name = azurerm_storage_container.example.name + type = "Block" + source = "example.csv" +} + resource "azurerm_stream_analytics_job" "example" { name = "example-job" resource_group_name = azurerm_resource_group.example.name @@ -36,10 +58,55 @@ resource "azurerm_stream_analytics_job" "example" { transformation_query = <