Skip to content

Commit

Permalink
Terraform BigQuery Table Hive partitioning support (#3335) (#6488)
Browse files Browse the repository at this point in the history
* range partitioning for BigQuery is GA

* add hive partitioning options to google_bigquery_table

* improve on formatting of bigquery table hive partitioning options

* correct indenting on  resource_bigquery_table_test.go

* minor fix on the documentation of bigquery table

* align bigquery table test with upstream changes

* gofmt on resource_bigquery_table, resource_bigquery_table_test.go

Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
modular-magician authored May 29, 2020
1 parent dd10841 commit 6294fae
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .changelog/3335.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
```release-note:enhancement
bigquery: Added support for `google_bigquery_table` `hive_partitioning_options`
```
```release-note:enhancement
bigquery: Added `google_bigquery_table` `range_partitioning` to GA
```
176 changes: 175 additions & 1 deletion google/resource_bigquery_table.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//
package google

import (
"encoding/json"
"errors"
"fmt"
"log"

Expand Down Expand Up @@ -183,6 +183,31 @@ func resourceBigQueryTable() *schema.Resource {
},
},

// HivePartitioningOptions:: [Optional] Options for configuring hive partitioning detect.
"hive_partitioning_options": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// Mode: [Optional] [Experimental] When set, what mode of hive partitioning to use when reading data.
// Two modes are supported.
//* AUTO: automatically infer partition key name(s) and type(s).
//* STRINGS: automatically infer partition key name(s).
"mode": {
Type: schema.TypeString,
Optional: true,
},
// SourceUriPrefix: [Optional] [Experimental] When hive partition detection is requested, a common for all source uris must be required.
// The prefix must end immediately before the partition key encoding begins.
"source_uri_prefix": {
Type: schema.TypeString,
Optional: true,
},
},
},
},

// IgnoreUnknownValues: [Optional] Indicates if BigQuery should
// allow extra values that are not represented in the table schema.
// If true, the extra values are ignored. If false, records with
Expand Down Expand Up @@ -306,6 +331,53 @@ func resourceBigQueryTable() *schema.Resource {
},
},

// RangePartitioning: [Optional] If specified, configures range-based
// partitioning for this table.
"range_partitioning": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// Field: [Required] The field used to determine how to create a range-based
// partition.
"field": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

// Range: [Required] Information required to partition based on ranges.
"range": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// Start: [Required] Start of the range partitioning, inclusive.
"start": {
Type: schema.TypeInt,
Required: true,
},

// End: [Required] End of the range partitioning, exclusive.
"end": {
Type: schema.TypeInt,
Required: true,
},

// Interval: [Required] The width of each range within the partition.
"interval": {
Type: schema.TypeInt,
Required: true,
},
},
},
},
},
},
},

// Clustering: [Optional] Specifies column names to use for data clustering. Up to four
// top-level columns are allowed, and should be specified in descending priority order.
"clustering": {
Expand Down Expand Up @@ -468,6 +540,15 @@ func resourceTable(d *schema.ResourceData, meta interface{}) (*bigquery.Table, e
table.TimePartitioning = expandTimePartitioning(v)
}

if v, ok := d.GetOk("range_partitioning"); ok {
rangePartitioning, err := expandRangePartitioning(v)
if err != nil {
return nil, err
}

table.RangePartitioning = rangePartitioning
}

if v, ok := d.GetOk("clustering"); ok {
table.Clustering = &bigquery.Clustering{
Fields: convertStringArr(v.([]interface{})),
Expand Down Expand Up @@ -557,6 +638,12 @@ func resourceBigQueryTableRead(d *schema.ResourceData, meta interface{}) error {
}
}

if res.RangePartitioning != nil {
if err := d.Set("range_partitioning", flattenRangePartitioning(res.RangePartitioning)); err != nil {
return err
}
}

if res.Clustering != nil {
d.Set("clustering", res.Clustering.Fields)
}
Expand Down Expand Up @@ -654,6 +741,9 @@ func expandExternalDataConfiguration(cfg interface{}) (*bigquery.ExternalDataCon
if v, ok := raw["google_sheets_options"]; ok {
edc.GoogleSheetsOptions = expandGoogleSheetsOptions(v)
}
if v, ok := raw["hive_partitioning_options"]; ok {
edc.HivePartitioningOptions = expandHivePartitioningOptions(v)
}
if v, ok := raw["ignore_unknown_values"]; ok {
edc.IgnoreUnknownValues = v.(bool)
}
Expand Down Expand Up @@ -686,6 +776,10 @@ func flattenExternalDataConfiguration(edc *bigquery.ExternalDataConfiguration) (
result["google_sheets_options"] = flattenGoogleSheetsOptions(edc.GoogleSheetsOptions)
}

if edc.HivePartitioningOptions != nil {
result["hive_partitioning_options"] = flattenHivePartitioningOptions(edc.HivePartitioningOptions)
}

if edc.IgnoreUnknownValues {
result["ignore_unknown_values"] = edc.IgnoreUnknownValues
}
Expand Down Expand Up @@ -800,6 +894,39 @@ func flattenGoogleSheetsOptions(opts *bigquery.GoogleSheetsOptions) []map[string
return []map[string]interface{}{result}
}

func expandHivePartitioningOptions(configured interface{}) *bigquery.HivePartitioningOptions {
if len(configured.([]interface{})) == 0 {
return nil
}

raw := configured.([]interface{})[0].(map[string]interface{})
opts := &bigquery.HivePartitioningOptions{}

if v, ok := raw["mode"]; ok {
opts.Mode = v.(string)
}

if v, ok := raw["source_uri_prefix"]; ok {
opts.SourceUriPrefix = v.(string)
}

return opts
}

func flattenHivePartitioningOptions(opts *bigquery.HivePartitioningOptions) []map[string]interface{} {
result := map[string]interface{}{}

if opts.Mode != "" {
result["mode"] = opts.Mode
}

if opts.SourceUriPrefix != "" {
result["source_uri_prefix"] = opts.SourceUriPrefix
}

return []map[string]interface{}{result}
}

func expandSchema(raw interface{}) (*bigquery.TableSchema, error) {
var fields []*bigquery.TableFieldSchema

Expand Down Expand Up @@ -842,6 +969,38 @@ func expandTimePartitioning(configured interface{}) *bigquery.TimePartitioning {
return tp
}

func expandRangePartitioning(configured interface{}) (*bigquery.RangePartitioning, error) {
if configured == nil {
return nil, nil
}

rpList := configured.([]interface{})
if len(rpList) == 0 || rpList[0] == nil {
return nil, errors.New("Error casting range partitioning interface to expected structure")
}

rangePartJson := rpList[0].(map[string]interface{})
rp := &bigquery.RangePartitioning{
Field: rangePartJson["field"].(string),
}

if v, ok := rangePartJson["range"]; ok && v != nil {
rangeLs := v.([]interface{})
if len(rangeLs) != 1 || rangeLs[0] == nil {
return nil, errors.New("Non-empty range must be given for range partitioning")
}

rangeJson := rangeLs[0].(map[string]interface{})
rp.Range = &bigquery.RangePartitioningRange{
Start: int64(rangeJson["start"].(int)),
End: int64(rangeJson["end"].(int)),
Interval: int64(rangeJson["interval"].(int)),
}
}

return rp, nil
}

func flattenEncryptionConfiguration(ec *bigquery.EncryptionConfiguration) []map[string]interface{} {
return []map[string]interface{}{{"kms_key_name": ec.KmsKeyName}}
}
Expand All @@ -864,6 +1023,21 @@ func flattenTimePartitioning(tp *bigquery.TimePartitioning) []map[string]interfa
return []map[string]interface{}{result}
}

func flattenRangePartitioning(rp *bigquery.RangePartitioning) []map[string]interface{} {
result := map[string]interface{}{
"field": rp.Field,
"range": []map[string]interface{}{
{
"start": rp.Range.Start,
"end": rp.Range.End,
"interval": rp.Range.Interval,
},
},
}

return []map[string]interface{}{result}
}

func expandView(configured interface{}) *bigquery.ViewDefinition {
raw := configured.([]interface{})[0].(map[string]interface{})
vd := &bigquery.ViewDefinition{Query: raw["query"].(string)}
Expand Down
119 changes: 119 additions & 0 deletions google/resource_bigquery_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,53 @@ func TestAccBigQueryTable_Kms(t *testing.T) {
})
}

func TestAccBigQueryTable_HivePartitioning(t *testing.T) {
t.Parallel()
bucketName := testBucketName(t)
resourceName := "google_bigquery_table.test"
datasetID := fmt.Sprintf("tf_test_%s", randString(t, 10))
tableID := fmt.Sprintf("tf_test_%s", randString(t, 10))

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckBigQueryTableDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigQueryTableHivePartitioning(bucketName, datasetID, tableID),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccBigQueryTable_RangePartitioning(t *testing.T) {
t.Parallel()
resourceName := "google_bigquery_table.test"
datasetID := fmt.Sprintf("tf_test_%s", randString(t, 10))
tableID := fmt.Sprintf("tf_test_%s", randString(t, 10))

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckBigQueryTableDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigQueryTableRangePartitioning(datasetID, tableID),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccBigQueryTable_View(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -331,6 +378,78 @@ EOH
`, datasetID, cryptoKeyName, tableID)
}

func testAccBigQueryTableHivePartitioning(bucketName, datasetID, tableID string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "test" {
name = "%s"
force_destroy = true
}
resource "google_storage_bucket_object" "test" {
name = "key1=20200330/init.csv"
content = ";"
bucket = google_storage_bucket.test.name
}
resource "google_bigquery_dataset" "test" {
dataset_id = "%s"
}
resource "google_bigquery_table" "test" {
table_id = "%s"
dataset_id = google_bigquery_dataset.test.dataset_id
external_data_configuration {
source_format = "CSV"
autodetect = true
source_uris= ["gs://${google_storage_bucket.test.name}/*"]
hive_partitioning_options {
mode = "AUTO"
source_uri_prefix = "gs://${google_storage_bucket.test.name}/"
}
}
depends_on = ["google_storage_bucket_object.test"]
}
`, bucketName, datasetID, tableID)
}

func testAccBigQueryTableRangePartitioning(datasetID, tableID string) string {
return fmt.Sprintf(`
resource "google_bigquery_dataset" "test" {
dataset_id = "%s"
}
resource "google_bigquery_table" "test" {
table_id = "%s"
dataset_id = google_bigquery_dataset.test.dataset_id
range_partitioning {
field = "id"
range {
start = 1
end = 10000
interval = 100
}
}
schema = <<EOH
[
{
"name": "ts",
"type": "TIMESTAMP"
},
{
"name": "id",
"type": "INTEGER"
}
]
EOH
}
`, datasetID, tableID)
}

func testAccBigQueryTableWithView(datasetID, tableID string) string {
return fmt.Sprintf(`
resource "google_bigquery_dataset" "test" {
Expand Down
Loading

0 comments on commit 6294fae

Please sign in to comment.