Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FHIR store streaming config #2145

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/3611.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
`healthcare`: Added support for `streaming_configs` to `google_healthcare_fhir_store`
```
265 changes: 265 additions & 0 deletions google-beta/resource_healthcare_fhir_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"log"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -143,6 +144,76 @@ Cloud Pub/Sub topic. Not having adequate permissions will cause the calls that s
},
},
},
"stream_configs": {
Type: schema.TypeList,
Optional: true,
Description: `A list of streaming configs that configure the destinations of streaming export for every resource mutation in
this FHIR store. Each store is allowed to have up to 10 streaming configs. After a new config is added, the next
resource mutation is streamed to the new location in addition to the existing ones. When a location is removed
from the list, the server stops streaming to that location. Before adding a new config, you must add the required
bigquery.dataEditor role to your project's Cloud Healthcare Service Agent service account. Some lag (typically on
the order of dozens of seconds) is expected before the results show up in the streaming destination.`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bigquery_destination": {
Type: schema.TypeList,
Required: true,
Description: `The destination BigQuery structure that contains both the dataset location and corresponding schema config.
The output is organized in one table per resource type. The server reuses the existing tables (if any) that
are named after the resource types, e.g. "Patient", "Observation". When there is no existing table for a given
resource type, the server attempts to create one.
See the [streaming config reference](https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores#streamconfig) for more details.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"dataset_uri": {
Type: schema.TypeString,
Required: true,
Description: `BigQuery URI to a dataset, up to 2000 characters long, in the format bq://projectId.bqDatasetId`,
},
"schema_config": {
Type: schema.TypeList,
Required: true,
Description: `The configuration for the exported BigQuery schema.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"recursive_structure_depth": {
Type: schema.TypeInt,
Required: true,
Description: `The depth for all recursive structures in the output analytics schema. For example, concept in the CodeSystem
resource is a recursive structure; when the depth is 2, the CodeSystem table will have a column called
concept.concept but not concept.concept.concept. If not specified or set to 0, the server will use the default
value 2. The maximum depth allowed is 5.`,
},
"schema_type": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{"ANALYTICS", ""}, false),
Description: `Specifies the output schema type. Only ANALYTICS is supported at this time.
* ANALYTICS: Analytics schema defined by the FHIR community.
See https://github.com/FHIR/sql-on-fhir/blob/master/sql-on-fhir.md. Default value: "ANALYTICS" Possible values: ["ANALYTICS"]`,
Default: "ANALYTICS",
},
},
},
},
},
},
},
"resource_types": {
Type: schema.TypeList,
Optional: true,
Description: `Supply a FHIR resource type (such as "Patient" or "Observation"). See
https://www.hl7.org/fhir/valueset-resource-types.html for a list of all FHIR resource types. The server treats
an empty list as an intent to stream all the supported resource types in this FHIR store.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
},
},
},
"version": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -212,6 +283,12 @@ func resourceHealthcareFhirStoreCreate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("notification_config"); !isEmptyValue(reflect.ValueOf(notificationConfigProp)) && (ok || !reflect.DeepEqual(v, notificationConfigProp)) {
obj["notificationConfig"] = notificationConfigProp
}
streamConfigsProp, err := expandHealthcareFhirStoreStreamConfigs(d.Get("stream_configs"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("stream_configs"); !isEmptyValue(reflect.ValueOf(streamConfigsProp)) && (ok || !reflect.DeepEqual(v, streamConfigsProp)) {
obj["streamConfigs"] = streamConfigsProp
}

url, err := replaceVars(d, config, "{{HealthcareBasePath}}{{dataset}}/fhirStores?fhirStoreId={{name}}")
if err != nil {
Expand Down Expand Up @@ -285,6 +362,9 @@ func resourceHealthcareFhirStoreRead(d *schema.ResourceData, meta interface{}) e
if err := d.Set("notification_config", flattenHealthcareFhirStoreNotificationConfig(res["notificationConfig"], d, config)); err != nil {
return fmt.Errorf("Error reading FhirStore: %s", err)
}
if err := d.Set("stream_configs", flattenHealthcareFhirStoreStreamConfigs(res["streamConfigs"], d, config)); err != nil {
return fmt.Errorf("Error reading FhirStore: %s", err)
}

return nil
}
Expand All @@ -311,6 +391,12 @@ func resourceHealthcareFhirStoreUpdate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("notification_config"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, notificationConfigProp)) {
obj["notificationConfig"] = notificationConfigProp
}
streamConfigsProp, err := expandHealthcareFhirStoreStreamConfigs(d.Get("stream_configs"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("stream_configs"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, streamConfigsProp)) {
obj["streamConfigs"] = streamConfigsProp
}

url, err := replaceVars(d, config, "{{HealthcareBasePath}}{{dataset}}/fhirStores/{{name}}")
if err != nil {
Expand All @@ -331,6 +417,10 @@ func resourceHealthcareFhirStoreUpdate(d *schema.ResourceData, meta interface{})
if d.HasChange("notification_config") {
updateMask = append(updateMask, "notificationConfig")
}

if d.HasChange("stream_configs") {
updateMask = append(updateMask, "streamConfigs")
}
// updateMask is a URL parameter but not present in the schema, so replaceVars
// won't set it
url, err = addQueryParams(url, map[string]string{"updateMask": strings.Join(updateMask, ",")})
Expand Down Expand Up @@ -426,6 +516,84 @@ func flattenHealthcareFhirStoreNotificationConfigPubsubTopic(v interface{}, d *s
return v
}

func flattenHealthcareFhirStoreStreamConfigs(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return v
}
l := v.([]interface{})
transformed := make([]interface{}, 0, len(l))
for _, raw := range l {
original := raw.(map[string]interface{})
if len(original) < 1 {
// Do not include empty json objects coming back from the api
continue
}
transformed = append(transformed, map[string]interface{}{
"resource_types": flattenHealthcareFhirStoreStreamConfigsResourceTypes(original["resourceTypes"], d, config),
"bigquery_destination": flattenHealthcareFhirStoreStreamConfigsBigqueryDestination(original["bigqueryDestination"], d, config),
})
}
return transformed
}
func flattenHealthcareFhirStoreStreamConfigsResourceTypes(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenHealthcareFhirStoreStreamConfigsBigqueryDestination(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["dataset_uri"] =
flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationDatasetUri(original["datasetUri"], d, config)
transformed["schema_config"] =
flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfig(original["schemaConfig"], d, config)
return []interface{}{transformed}
}
func flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationDatasetUri(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfig(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["schema_type"] =
flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigSchemaType(original["schemaType"], d, config)
transformed["recursive_structure_depth"] =
flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigRecursiveStructureDepth(original["recursiveStructureDepth"], d, config)
return []interface{}{transformed}
}
func flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigSchemaType(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigRecursiveStructureDepth(v interface{}, d *schema.ResourceData, config *Config) interface{} {
// Handles the string fixed64 format
if strVal, ok := v.(string); ok {
if intVal, err := strconv.ParseInt(strVal, 10, 64); err == nil {
return intVal
}
}

// number values are represented as float64
if floatVal, ok := v.(float64); ok {
intVal := int(floatVal)
return intVal
}

return v // let terraform core handle it otherwise
}

func expandHealthcareFhirStoreName(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}
Expand Down Expand Up @@ -484,6 +652,103 @@ func expandHealthcareFhirStoreNotificationConfigPubsubTopic(v interface{}, d Ter
return v, nil
}

func expandHealthcareFhirStoreStreamConfigs(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
req := make([]interface{}, 0, len(l))
for _, raw := range l {
if raw == nil {
continue
}
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedResourceTypes, err := expandHealthcareFhirStoreStreamConfigsResourceTypes(original["resource_types"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedResourceTypes); val.IsValid() && !isEmptyValue(val) {
transformed["resourceTypes"] = transformedResourceTypes
}

transformedBigqueryDestination, err := expandHealthcareFhirStoreStreamConfigsBigqueryDestination(original["bigquery_destination"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedBigqueryDestination); val.IsValid() && !isEmptyValue(val) {
transformed["bigqueryDestination"] = transformedBigqueryDestination
}

req = append(req, transformed)
}
return req, nil
}

func expandHealthcareFhirStoreStreamConfigsResourceTypes(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandHealthcareFhirStoreStreamConfigsBigqueryDestination(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedDatasetUri, err := expandHealthcareFhirStoreStreamConfigsBigqueryDestinationDatasetUri(original["dataset_uri"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedDatasetUri); val.IsValid() && !isEmptyValue(val) {
transformed["datasetUri"] = transformedDatasetUri
}

transformedSchemaConfig, err := expandHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfig(original["schema_config"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedSchemaConfig); val.IsValid() && !isEmptyValue(val) {
transformed["schemaConfig"] = transformedSchemaConfig
}

return transformed, nil
}

func expandHealthcareFhirStoreStreamConfigsBigqueryDestinationDatasetUri(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfig(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedSchemaType, err := expandHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigSchemaType(original["schema_type"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedSchemaType); val.IsValid() && !isEmptyValue(val) {
transformed["schemaType"] = transformedSchemaType
}

transformedRecursiveStructureDepth, err := expandHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigRecursiveStructureDepth(original["recursive_structure_depth"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedRecursiveStructureDepth); val.IsValid() && !isEmptyValue(val) {
transformed["recursiveStructureDepth"] = transformedRecursiveStructureDepth
}

return transformed, nil
}

func expandHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigSchemaType(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandHealthcareFhirStoreStreamConfigsBigqueryDestinationSchemaConfigRecursiveStructureDepth(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func resourceHealthcareFhirStoreDecoder(d *schema.ResourceData, meta interface{}, res map[string]interface{}) (map[string]interface{}, error) {
// Take the returned long form of the name and use it as `self_link`.
// Then modify the name to be the user specified form.
Expand Down
Loading