Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azurerm_stream_analytics_output_blob support for type Parquet & batch_max_wait_time/batch_min_rows #13245

Merged
merged 8 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions internal/services/streamanalytics/helpers_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func schemaStreamAnalyticsOutputSerialization() *pluginsdk.Schema {
string(streamanalytics.TypeAvro),
string(streamanalytics.TypeCsv),
string(streamanalytics.TypeJSON),
string(streamanalytics.TypeParquet),
}, false),
},

Expand Down Expand Up @@ -119,6 +120,21 @@ func expandStreamAnalyticsOutputSerialization(input []interface{}) (streamanalyt
Format: streamanalytics.JSONOutputSerializationFormat(format),
},
}, nil

case streamanalytics.TypeParquet:
if encoding != "" {
return nil, fmt.Errorf("`encoding` cannot be set when `type` is set to `Parquet`")
}
if fieldDelimiter != "" {
return nil, fmt.Errorf("`field_delimiter` cannot be set when `type` is set to `Parquet`")
}
if format != "" {
return nil, fmt.Errorf("`format` cannot be set when `type` is set to `Parquet`")
}
return streamanalytics.ParquetSerialization{
Type: streamanalytics.TypeParquet,
Properties: map[string]interface{}{},
}, nil
}

return nil, fmt.Errorf("Unsupported Output Type %q", outputType)
Expand Down Expand Up @@ -153,6 +169,11 @@ func flattenStreamAnalyticsOutputSerialization(input streamanalytics.BasicSerial

outputType = string(streamanalytics.TypeJSON)
}

if _, ok := input.AsParquetSerialization(); ok {
outputType = string(streamanalytics.TypeParquet)
}

return []interface{}{
map[string]interface{}{
"encoding": encoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/terraform-provider-azurerm/helpers/tf"
"github.com/hashicorp/terraform-provider-azurerm/internal/clients"
"github.com/hashicorp/terraform-provider-azurerm/internal/services/streamanalytics/parse"
"github.com/hashicorp/terraform-provider-azurerm/internal/services/streamanalytics/validate"
"github.com/hashicorp/terraform-provider-azurerm/internal/tf/pluginsdk"
"github.com/hashicorp/terraform-provider-azurerm/internal/tf/validation"
"github.com/hashicorp/terraform-provider-azurerm/internal/timeouts"
Expand Down Expand Up @@ -89,6 +90,17 @@ func resourceStreamAnalyticsOutputBlob() *pluginsdk.Resource {
},

"serialization": schemaStreamAnalyticsOutputSerialization(),

"batch_max_wait_time": {
Type: pluginsdk.TypeString,
Optional: true,
ValidateFunc: validate.BatchMaxWaitTime,
},
"batch_min_rows": {
Type: pluginsdk.TypeFloat,
Optional: true,
ValidateFunc: validation.FloatBetween(0, 10000),
},
},
}
}
Expand Down Expand Up @@ -151,6 +163,20 @@ func resourceStreamAnalyticsOutputBlobCreateUpdate(d *pluginsdk.ResourceData, me
},
}

if batchMaxWaitTime, ok := d.GetOk("batch_max_wait_time"); ok {
props.TimeWindow = utils.String(batchMaxWaitTime.(string))
}

if batchMinRows, ok := d.GetOk("batch_min_rows"); ok {
props.SizeWindow = utils.Float(batchMinRows.(float64))
}

// timeWindow and sizeWindow must be set for Parquet serialization
_, isParquet := serialization.AsParquetSerialization()
if isParquet && (props.TimeWindow == nil || props.SizeWindow == nil) {
return fmt.Errorf("cannot create Stream Analytics Output Blob %q (Job %q / Resource Group %q): batch_min_rows and batch_time_window must be set for Parquet serialization", name, jobName, resourceGroup)
}

if d.IsNewResource() {
if _, err := client.CreateOrReplace(ctx, props, resourceGroup, jobName, name, "", ""); err != nil {
return fmt.Errorf("Creating Stream Analytics Output Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
Expand Down Expand Up @@ -216,6 +242,9 @@ func resourceStreamAnalyticsOutputBlobRead(d *pluginsdk.ResourceData, meta inter
if err := d.Set("serialization", flattenStreamAnalyticsOutputSerialization(props.Serialization)); err != nil {
return fmt.Errorf("setting `serialization`: %+v", err)
}
d.Set("batch_max_wait_time", props.TimeWindow)
d.Set("batch_min_rows", props.SizeWindow)

}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ func TestAccStreamAnalyticsOutputBlob_json(t *testing.T) {
})
}

func TestAccStreamAnalyticsOutputBlob_parquet(t *testing.T) {
data := acceptance.BuildTestData(t, "azurerm_stream_analytics_output_blob", "test")
r := StreamAnalyticsOutputBlobResource{}

data.ResourceTest(t, r, []acceptance.TestStep{
{
Config: r.parquet(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("storage_account_key"),
})
}

func TestAccStreamAnalyticsOutputBlob_update(t *testing.T) {
data := acceptance.BuildTestData(t, "azurerm_stream_analytics_output_blob", "test")
r := StreamAnalyticsOutputBlobResource{}
Expand Down Expand Up @@ -183,6 +198,31 @@ resource "azurerm_stream_analytics_output_blob" "test" {
`, template, data.RandomInteger)
}

func (r StreamAnalyticsOutputBlobResource) parquet(data acceptance.TestData) string {
template := r.template(data)
return fmt.Sprintf(`
%s

resource "azurerm_stream_analytics_output_blob" "test" {
name = "acctestinput-%d"
stream_analytics_job_name = azurerm_stream_analytics_job.test.name
resource_group_name = azurerm_stream_analytics_job.test.resource_group_name
storage_account_name = azurerm_storage_account.test.name
storage_account_key = azurerm_storage_account.test.primary_access_key
storage_container_name = azurerm_storage_container.test.name
path_pattern = "some-other-pattern"
date_format = "yyyy-MM-dd"
time_format = "HH"
batch_max_wait_time = "00:02:00"
batch_min_rows = 5000

serialization {
type = "Parquet"
}
}
`, template, data.RandomInteger)
}

func (r StreamAnalyticsOutputBlobResource) updated(data acceptance.TestData) string {
template := r.template(data)
return fmt.Sprintf(`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package validate

import (
"fmt"
"regexp"
)

func BatchMaxWaitTime(input interface{}, key string) (warnings []string, errors []error) {
value, ok := input.(string)
if !ok {
errors = append(errors, fmt.Errorf("expected type of %s to be string", key))
return
}

if value == "" {
errors = append(errors, fmt.Errorf("%q must not be empty", key))
}

if matched := regexp.MustCompile(`[0-9]{2}:[0-9]{2}:[0-9]{2}`).Match([]byte(value)); !matched {
errors = append(errors, fmt.Errorf("%q must have the following format hh:mm:ss", key))
}

return warnings, errors
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package validate

import "testing"

func TestBatchMaxWaitTime(t *testing.T) {
cases := map[string]bool{
"": false,
"NotValid": false,
"10:00": false,
"00:02:00": true,
"00:00:00": true,
"99:99:99": true,
"2": false,
}
for i, shouldBeValid := range cases {
_, errors := BatchMaxWaitTime(i, "batch_max_wait_time")

isValid := len(errors) == 0
if shouldBeValid != isValid {
t.Fatalf("Expected %s to be %t but got %t", i, shouldBeValid, isValid)
}
}
}
8 changes: 7 additions & 1 deletion website/docs/r/stream_analytics_output_blob.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,17 @@ The following arguments are supported:

* `serialization` - (Required) A `serialization` block as defined below.

* `batch_max_wait_time` - (Optional) The maximum wait time per batch in `hh:mm:ss` e.g. `00:02:00` for two minutes.

* `batch_min_rows` - (Optional) The minimum number of rows per batch (must be between `0` and `10000`).

---

A `serialization` block supports the following:

* `type` - (Required) The serialization format used for outgoing data streams. Possible values are `Avro`, `Csv` and `Json`.
* `type` - (Required) The serialization format used for outgoing data streams. Possible values are `Avro`, `Csv`, `Json` and `Parquet`.

-> **NOTE:** `batch_max_wait_time` and `batch_min_rows` are required when `type` is set to `Parquet`

* `encoding` - (Optional) The encoding of the incoming data in the case of input and the encoding of outgoing data in the case of output. It currently can only be set to `UTF8`.

Expand Down