Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azurerm_stream_analytics_job - support new property job_type #16548

Merged
merged 2 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ func resourceStreamAnalyticsJob() *pluginsdk.Resource {
Default: string(streamanalytics.EventsOutOfOrderPolicyAdjust),
},

"type": {
Type: pluginsdk.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
string(streamanalytics.JobTypeCloud),
string(streamanalytics.JobTypeEdge),
}, false),
Default: string(streamanalytics.JobTypeCloud),
},

"output_error_policy": {
Type: pluginsdk.TypeString,
Optional: true,
Expand All @@ -114,7 +125,7 @@ func resourceStreamAnalyticsJob() *pluginsdk.Resource {

"streaming_units": {
Type: pluginsdk.TypeInt,
Required: true,
Optional: true,
ValidateFunc: validate.StreamAnalyticsJobStreamingUnits,
},

Expand Down Expand Up @@ -167,21 +178,32 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte
eventsLateArrivalMaxDelayInSeconds := d.Get("events_late_arrival_max_delay_in_seconds").(int)
eventsOutOfOrderMaxDelayInSeconds := d.Get("events_out_of_order_max_delay_in_seconds").(int)
eventsOutOfOrderPolicy := d.Get("events_out_of_order_policy").(string)
jobType := d.Get("type").(string)
location := azure.NormalizeLocation(d.Get("location").(string))
outputErrorPolicy := d.Get("output_error_policy").(string)
streamingUnits := d.Get("streaming_units").(int)
transformationQuery := d.Get("transformation_query").(string)
t := d.Get("tags").(map[string]interface{})

// needs to be defined inline for a Create but via a separate API for Update
transformation := streamanalytics.Transformation{
Name: utils.String("main"),
TransformationProperties: &streamanalytics.TransformationProperties{
StreamingUnits: utils.Int32(int32(streamingUnits)),
Query: utils.String(transformationQuery),
Query: utils.String(transformationQuery),
},
}

if jobType == string(streamanalytics.JobTypeEdge) {
if _, ok := d.GetOk("streaming_units"); ok {
return fmt.Errorf("the job type `Edge` doesn't support `streaming_units`")
}
} else {
if v, ok := d.GetOk("streaming_units"); ok {
transformation.TransformationProperties.StreamingUnits = utils.Int32(int32(v.(int)))
} else {
return fmt.Errorf("`streaming_units` must be set when `type` is `Cloud`")
}
}

expandedIdentity, err := expandStreamAnalyticsJobIdentity(d.Get("identity").([]interface{}))
if err != nil {
return fmt.Errorf("expanding `identity`: %+v", err)
Expand All @@ -199,18 +221,25 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte
EventsOutOfOrderMaxDelayInSeconds: utils.Int32(int32(eventsOutOfOrderMaxDelayInSeconds)),
EventsOutOfOrderPolicy: streamanalytics.EventsOutOfOrderPolicy(eventsOutOfOrderPolicy),
OutputErrorPolicy: streamanalytics.OutputErrorPolicy(outputErrorPolicy),
JobType: streamanalytics.JobType(jobType),
},
Identity: expandedIdentity,
Tags: tags.Expand(t),
}

if streamAnalyticsCluster := d.Get("stream_analytics_cluster_id"); streamAnalyticsCluster != "" {
props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{
ID: utils.String(streamAnalyticsCluster.(string)),
if jobType == string(streamanalytics.JobTypeEdge) {
if _, ok := d.GetOk("stream_analytics_cluster_id"); ok {
return fmt.Errorf("the job type `Edge` doesn't support `stream_analytics_cluster_id`")
}
} else {
props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{
ID: nil,
if streamAnalyticsCluster := d.Get("stream_analytics_cluster_id"); streamAnalyticsCluster != "" {
props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{
ID: utils.String(streamAnalyticsCluster.(string)),
}
} else {
props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{
ID: nil,
}
}
}

Expand Down Expand Up @@ -297,6 +326,7 @@ func resourceStreamAnalyticsJobRead(d *pluginsdk.ResourceData, meta interface{})
}
d.Set("events_out_of_order_policy", string(props.EventsOutOfOrderPolicy))
d.Set("output_error_policy", string(props.OutputErrorPolicy))
d.Set("type", string(props.JobType))

// Computed
d.Set("job_id", props.JobID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,36 @@ func TestAccStreamAnalyticsJob_identity(t *testing.T) {
})
}

func TestAccStreamAnalyticsJob_jobTypeCloud(t *testing.T) {
data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job", "test")
r := StreamAnalyticsJobResource{}

data.ResourceTest(t, r, []acceptance.TestStep{
{
Config: r.jobTypeCloud(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep(),
})
}

func TestAccStreamAnalyticsJob_jobTypeEdge(t *testing.T) {
data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job", "test")
r := StreamAnalyticsJobResource{}

data.ResourceTest(t, r, []acceptance.TestStep{
{
Config: r.jobTypeEdge(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep(),
})
}

func (r StreamAnalyticsJobResource) Exists(ctx context.Context, client *clients.Client, state *pluginsdk.InstanceState) (*bool, error) {
id, err := parse.StreamingJobID(state.ID)
if err != nil {
Expand Down Expand Up @@ -278,3 +308,56 @@ QUERY
}
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger)
}

func (r StreamAnalyticsJobResource) jobTypeCloud(data acceptance.TestData) string {
return fmt.Sprintf(`
provider "azurerm" {
features {}
}

resource "azurerm_resource_group" "test" {
name = "acctestRG-%d"
location = "%s"
}

resource "azurerm_stream_analytics_job" "test" {
name = "acctestjob-%d"
resource_group_name = azurerm_resource_group.test.name
location = azurerm_resource_group.test.location
streaming_units = 3
type = "Cloud"

transformation_query = <<QUERY
SELECT *
INTO [YourOutputAlias]
FROM [YourInputAlias]
QUERY
}
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger)
}

func (r StreamAnalyticsJobResource) jobTypeEdge(data acceptance.TestData) string {
return fmt.Sprintf(`
provider "azurerm" {
features {}
}

resource "azurerm_resource_group" "test" {
name = "acctestRG-%d"
location = "%s"
}

resource "azurerm_stream_analytics_job" "test" {
name = "acctestjob-%d"
resource_group_name = azurerm_resource_group.test.name
location = azurerm_resource_group.test.location
type = "Edge"

transformation_query = <<QUERY
SELECT *
INTO [YourOutputAlias]
FROM [YourInputAlias]
QUERY
}
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger)
}
8 changes: 7 additions & 1 deletion website/docs/r/stream_analytics_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ The following arguments are supported:

* `events_out_of_order_policy` - (Optional) Specifies the policy which should be applied to events which arrive out of order in the input event stream. Possible values are `Adjust` and `Drop`. Default is `Adjust`.

* `type` - (Optional) The type of the Stream Analytics Job. Possible values are `Cloud` and `Edge`. Defaults to `Cloud`. Changing this forces a new resource to be created.

-> **NOTE:** `Edge` doesn't support `stream_analytics_cluster_id` and `streaming_units`.

* `identity` - (Optional) An `identity` block as defined below.

* `output_error_policy` - (Optional) Specifies the policy which should be applied to events which arrive at the output and cannot be written to the external storage due to being malformed (such as missing column values, column values of wrong type or size). Possible values are `Drop` and `Stop`. Default is `Drop`.

* `streaming_units` - (Required) Specifies the number of streaming units that the streaming job uses. Supported values are `1`, `3`, `6` and multiples of `6` up to `120`.
* `streaming_units` - (Optional) Specifies the number of streaming units that the streaming job uses. Supported values are `1`, `3`, `6` and multiples of `6` up to `120`.

-> **NOTE:** `streaming_units` must be set when `type` is `Cloud`.

* `transformation_query` - (Required) Specifies the query that will be run in the streaming job, [written in Stream Analytics Query Language (SAQL)](https://msdn.microsoft.com/library/azure/dn834998).

Expand Down