Skip to content

Commit

Permalink
Merge pull request #18203 from brunhil/b-aws_lakeformation_permission…
Browse files Browse the repository at this point in the history
…s-select-bugfix

r/aws_lakeformation_permissions: Fix bug where SELECT permissions are not properly read in from AWS
  • Loading branch information
YakDriver authored Mar 18, 2021
2 parents e330c1f + 2e9bb88 commit 476ec32
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 35 deletions.
3 changes: 3 additions & 0 deletions .changelog/18203.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_lakeformation_permissions: Properly serialize SELECT permission for `permissions` and `permissions_with_grant_option` fields
```
134 changes: 106 additions & 28 deletions aws/resource_aws_lakeformation_permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ func resourceAwsLakeFormationPermissionsRead(d *schema.ResourceData, meta interf

input.Resource = expandLakeFormationResource(d, true)
matchResource := expandLakeFormationResource(d, false)
// AWS treats SELECT permissions differently. A separate resource is created for the {db}.{table}.* to grant select on all columns
selectPermissionsResource := expandLakeFormationResourceForSelectPermissions(d)

log.Printf("[DEBUG] Reading Lake Formation permissions: %v", input)
var principalResourcePermissions []*lakeformation.PrincipalResourcePermissions
Expand All @@ -270,6 +272,12 @@ func resourceAwsLakeFormationPermissionsRead(d *schema.ResourceData, meta interf

if resourceAwsLakeFormationPermissionsCompareResource(*matchResource, *permission.Resource) {
principalResourcePermissions = append(principalResourcePermissions, permission)
continue
}

// AWS treats SELECT permissions differently. A separate resource is created for the {db}.{table}.* to grant select on all columns
if selectPermissionsResource != nil && resourceAwsLakeFormationPermissionsCompareResource(*selectPermissionsResource, *permission.Resource) {
principalResourcePermissions = append(principalResourcePermissions, permission)
}
}
return !lastPage
Expand All @@ -293,6 +301,12 @@ func resourceAwsLakeFormationPermissionsRead(d *schema.ResourceData, meta interf

if resourceAwsLakeFormationPermissionsCompareResource(*matchResource, *permission.Resource) {
principalResourcePermissions = append(principalResourcePermissions, permission)
continue
}

// AWS treats SELECT permissions differently. A separate resource is created for the {db}.{table}.* to grant select on all columns
if selectPermissionsResource != nil && resourceAwsLakeFormationPermissionsCompareResource(*selectPermissionsResource, *permission.Resource) {
principalResourcePermissions = append(principalResourcePermissions, permission)
}
}
return !lastPage
Expand All @@ -309,42 +323,43 @@ func resourceAwsLakeFormationPermissionsRead(d *schema.ResourceData, meta interf
return fmt.Errorf("error reading Lake Formation permissions: %w", err)
}

if len(principalResourcePermissions) > 1 {
if len(principalResourcePermissions) == 0 {
return fmt.Errorf("error reading Lake Formation permissions: %s", "no permissions found")
}

if len(principalResourcePermissions) > 2 {
return fmt.Errorf("error reading Lake Formation permissions: %s", "multiple permissions found for same resource")
}

for _, permissions := range principalResourcePermissions {
d.Set("principal", permissions.Principal.DataLakePrincipalIdentifier)
d.Set("permissions", permissions.Permissions)
d.Set("permissions_with_grant_option", permissions.PermissionsWithGrantOption)
d.Set("principal", principalResourcePermissions[0].Principal.DataLakePrincipalIdentifier)
d.Set("permissions", flattenLakeFormationPermissions(principalResourcePermissions))
d.Set("permissions_with_grant_option", flattenLakeFormationGrantPermissions(principalResourcePermissions))

if permissions.Resource.Catalog != nil {
d.Set("catalog_resource", true)
}

if permissions.Resource.DataLocation != nil {
d.Set("data_location", []interface{}{flattenLakeFormationDataLocationResource(permissions.Resource.DataLocation)})
} else {
d.Set("data_location", nil)
}
if principalResourcePermissions[0].Resource.Catalog != nil {
d.Set("catalog_resource", true)
}

if permissions.Resource.Database != nil {
d.Set("database", []interface{}{flattenLakeFormationDatabaseResource(permissions.Resource.Database)})
} else {
d.Set("database", nil)
}
if principalResourcePermissions[0].Resource.DataLocation != nil {
d.Set("data_location", []interface{}{flattenLakeFormationDataLocationResource(principalResourcePermissions[0].Resource.DataLocation)})
} else {
d.Set("data_location", nil)
}

// table with columns permissions will include the table and table with columns
if permissions.Resource.TableWithColumns != nil {
d.Set("table_with_columns", []interface{}{flattenLakeFormationTableWithColumnsResource(permissions.Resource.TableWithColumns)})
} else if permissions.Resource.Table != nil {
d.Set("table_with_columns", nil)
d.Set("table", []interface{}{flattenLakeFormationTableResource(permissions.Resource.Table)})
} else {
d.Set("table", nil)
}
if principalResourcePermissions[0].Resource.Database != nil {
d.Set("database", []interface{}{flattenLakeFormationDatabaseResource(principalResourcePermissions[0].Resource.Database)})
} else {
d.Set("database", nil)
}

// table with columns permissions will include the table and table with columns
if principalResourcePermissions[0].Resource.TableWithColumns != nil {
d.Set("table_with_columns", []interface{}{flattenLakeFormationTableWithColumnsResource(principalResourcePermissions[0].Resource.TableWithColumns)})
} else if principalResourcePermissions[0].Resource.Table != nil {
d.Set("table_with_columns", nil)
d.Set("table", []interface{}{flattenLakeFormationTableResource(principalResourcePermissions[0].Resource.Table)})
} else {
d.Set("table", nil)
}
return nil
}

Expand Down Expand Up @@ -464,6 +479,37 @@ func expandLakeFormationResource(d *schema.ResourceData, squashTableWithColumns
return res
}

func expandLakeFormationResourceForSelectPermissions(d *schema.ResourceData) *lakeformation.Resource {
tableMapSchema := d.Get("table").([]interface{})
if len(tableMapSchema) == 0 {
return nil
}

tableSchema := tableMapSchema[0].(map[string]interface{})
if tableSchema == nil {
return nil
}

databaseName, ok := tableSchema["database_name"].(string)
if !ok {
return nil
}
name, ok := tableSchema["name"].(string)
if !ok {
return nil
}

res := &lakeformation.Resource{
TableWithColumns: &lakeformation.TableWithColumnsResource{
DatabaseName: aws.String(databaseName),
Name: aws.String(name),
ColumnWildcard: &lakeformation.ColumnWildcard{}, // A wildcard is used for SELECT permissions
},
}

return res
}

func expandLakeFormationDataLocationResource(tfMap map[string]interface{}) *lakeformation.DataLocationResource {
if tfMap == nil {
return nil
Expand Down Expand Up @@ -647,3 +693,35 @@ func flattenLakeFormationTableWithColumnsResource(apiObject *lakeformation.Table

return tfMap
}

func flattenLakeFormationPermissions(apiObjects []*lakeformation.PrincipalResourcePermissions) []string {
if apiObjects == nil {
return nil
}

tfList := make([]string, 0)

for _, resourcePermission := range apiObjects {
for _, permission := range resourcePermission.Permissions {
tfList = append(tfList, aws.StringValue(permission))
}
}

return tfList
}

func flattenLakeFormationGrantPermissions(apiObjects []*lakeformation.PrincipalResourcePermissions) []string {
if apiObjects == nil {
return nil
}

tfList := make([]string, 0)

for _, resourcePermission := range apiObjects {
for _, grantPermission := range resourcePermission.PermissionsWithGrantOption {
tfList = append(tfList, aws.StringValue(grantPermission))
}
}

return tfList
}
93 changes: 93 additions & 0 deletions aws/resource_aws_lakeformation_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,29 @@ func testAccAWSLakeFormationPermissions_tableWithColumnsAndTable(t *testing.T) {
})
}

func testAccAWSLakeFormationPermissions_selectPermissions(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_lakeformation_permissions.test"
roleName := "aws_iam_role.test"

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPartitionHasServicePreCheck(lakeformation.EndpointsID, t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSLakeFormationPermissionsDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSLakeFormationPermissionsConfig_selectPermissions(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSLakeFormationPermissionsExists(resourceName),
resource.TestCheckResourceAttrPair(resourceName, "principal", roleName, "arn"),
resource.TestCheckResourceAttr(resourceName, "permissions.#", "7"),
resource.TestCheckResourceAttr(resourceName, "permissions_with_grant_option.#", "7"),
),
},
},
})
}

func testAccCheckAWSLakeFormationPermissionsDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).lakeformationconn

Expand Down Expand Up @@ -735,3 +758,73 @@ resource "aws_lakeformation_permissions" "test" {
}
`, rName)
}

func testAccAWSLakeFormationPermissionsConfig_selectPermissions(rName string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}
resource "aws_iam_role" "test" {
name = %[1]q
path = "/"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "glue.${data.aws_partition.current.dns_suffix}"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
data "aws_caller_identity" "current" {}
resource "aws_glue_catalog_database" "test" {
name = %[1]q
}
resource "aws_glue_catalog_table" "test" {
name = %[1]q
database_name = aws_glue_catalog_database.test.name
storage_descriptor {
columns {
name = "event"
type = "string"
}
columns {
name = "timestamp"
type = "date"
}
columns {
name = "value"
type = "double"
}
}
}
resource "aws_lakeformation_data_lake_settings" "test" {
# this will result in multiple permissions for iam role
admins = [aws_iam_role.test.arn, data.aws_caller_identity.current.arn]
}
resource "aws_lakeformation_permissions" "test" {
principal = aws_iam_role.test.arn
permissions = ["ALL", "ALTER", "DELETE", "DESCRIBE", "DROP", "INSERT", "SELECT"]
permissions_with_grant_option = ["ALL", "ALTER", "DELETE", "DESCRIBE", "DROP", "INSERT", "SELECT"]
table {
database_name = aws_glue_catalog_table.test.database_name
name = aws_glue_catalog_table.test.name
}
}
`, rName)
}
19 changes: 12 additions & 7 deletions aws/resource_aws_lakeformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ func TestAccAWSLakeFormation_serial(t *testing.T) {
"dataSource": testAccAWSLakeFormationDataLakeSettingsDataSource_basic,
},
"Permissions": {
"basic": testAccAWSLakeFormationPermissions_basic,
"dataLocation": testAccAWSLakeFormationPermissions_dataLocation,
"database": testAccAWSLakeFormationPermissions_database,
"tableName": testAccAWSLakeFormationPermissions_table_name,
"tableWildcard": testAccAWSLakeFormationPermissions_table_wildcard,
"tableWithColumns": testAccAWSLakeFormationPermissions_tableWithColumns,
"tableWithColumnsAndTable": testAccAWSLakeFormationPermissions_tableWithColumnsAndTable,
"basic": testAccAWSLakeFormationPermissions_basic,
"dataLocation": testAccAWSLakeFormationPermissions_dataLocation,
"database": testAccAWSLakeFormationPermissions_database,
"selectPermissions": testAccAWSLakeFormationPermissions_selectPermissions,
},
"TablePermissions": {
"tableName": testAccAWSLakeFormationPermissions_table_name,
"tableWildcard": testAccAWSLakeFormationPermissions_table_wildcard,
"tableWithColumns": testAccAWSLakeFormationPermissions_tableWithColumns,
"tableWithColumnsAndTable": testAccAWSLakeFormationPermissions_tableWithColumnsAndTable,
},
"DataSourcePermissions": {
"basicDataSource": testAccAWSLakeFormationPermissionsDataSource_basic,
"dataLocationDataSource": testAccAWSLakeFormationPermissionsDataSource_dataLocation,
"databaseDataSource": testAccAWSLakeFormationPermissionsDataSource_database,
Expand Down

0 comments on commit 476ec32

Please sign in to comment.