Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/aws_glue_crawler Add support for S3 event notifications #21467

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/21467.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_glue_crawler: Add support for S3 event notifications
```
31 changes: 31 additions & 0 deletions internal/service/glue/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ func ResourceCrawler() *schema.Resource {
Optional: true,
ValidateFunc: validation.IntBetween(1, 249),
},
"event_queue_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidARN,
},
"dlq_event_queue_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidARN,
},
},
},
},
Expand Down Expand Up @@ -304,6 +314,11 @@ func resourceCrawlerCreate(d *schema.ResourceData, meta interface{}) error {
return resource.RetryableError(err)
}

// InvalidInputException: SQS queue arn:aws:sqs:us-west-2:*******:tf-acc-test-4317277351691904203 does not exist or the role provided does not have access to it.
if tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "SQS queue") && tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "does not exist or the role provided does not have access to it") {
return resource.RetryableError(err)
}

return resource.NonRetryableError(err)
}
return nil
Expand Down Expand Up @@ -524,6 +539,14 @@ func expandGlueS3Target(cfg map[string]interface{}) *glue.S3Target {
target.SampleSize = aws.Int64(int64(v.(int)))
}

if v, ok := cfg["event_queue_arn"]; ok {
target.EventQueueArn = aws.String(v.(string))
}

if v, ok := cfg["dlq_event_queue_arn"]; ok {
target.DlqEventQueueArn = aws.String(v.(string))
}

return target
}

Expand Down Expand Up @@ -625,6 +648,11 @@ func resourceCrawlerUpdate(d *schema.ResourceData, meta interface{}) error {
return resource.RetryableError(err)
}

// InvalidInputException: SQS queue arn:aws:sqs:us-west-2:*******:tf-acc-test-4317277351691904203 does not exist or the role provided does not have access to it.
if tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "SQS queue") && tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "does not exist or the role provided does not have access to it") {
return resource.RetryableError(err)
}

return resource.NonRetryableError(err)
}
return nil
Expand Down Expand Up @@ -768,6 +796,9 @@ func flattenGlueS3Targets(s3Targets []*glue.S3Target) []map[string]interface{} {
attrs["sample_size"] = aws.Int64Value(s3Target.SampleSize)
}

attrs["event_queue_arn"] = aws.StringValue(s3Target.EventQueueArn)
attrs["dlq_event_queue_arn"] = aws.StringValue(s3Target.DlqEventQueueArn)

result = append(result, attrs)
}
return result
Expand Down
205 changes: 205 additions & 0 deletions internal/service/glue/crawler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,67 @@ func TestAccGlueCrawler_S3Target_exclusions(t *testing.T) {
})
}

func TestAccGlueCrawler_S3Target_eventqueue(t *testing.T) {
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_glue_crawler.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckCrawlerDestroy,
Steps: []resource.TestStep{
{
Config: testAccGlueCrawlerConfig_S3Target_EventQueue(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(resourceName, &crawler),
acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "glue", fmt.Sprintf("crawler/%s", rName)),
resource.TestCheckResourceAttr(resourceName, "s3_target.#", "1"),
acctest.CheckResourceAttrRegionalARN(resourceName, "s3_target.0.event_queue_arn", "sqs", rName),
resource.TestCheckResourceAttr(resourceName, "recrawl_policy.0.recrawl_behavior", "CRAWL_EVENT_MODE"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccGlueCrawler_S3Target_dlqeventqueue(t *testing.T) {
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_glue_crawler.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckCrawlerDestroy,
Steps: []resource.TestStep{
{
Config: testAccGlueCrawlerConfig_S3Target_DlqEventQueue(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(resourceName, &crawler),
acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "glue", fmt.Sprintf("crawler/%s", rName)),
resource.TestCheckResourceAttr(resourceName, "s3_target.#", "1"),
acctest.CheckResourceAttrRegionalARN(resourceName, "s3_target.0.event_queue_arn", "sqs", rName),
acctest.CheckResourceAttrRegionalARN(resourceName, "s3_target.0.dlq_event_queue_arn", "sqs", fmt.Sprintf("%sdlq", rName)),
resource.TestCheckResourceAttr(resourceName, "recrawl_policy.0.recrawl_behavior", "CRAWL_EVENT_MODE"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccGlueCrawler_S3Target_multiple(t *testing.T) {
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
Expand Down Expand Up @@ -2099,6 +2160,150 @@ resource "aws_glue_crawler" "test" {
`, rName, exclusion1, exclusion2)
}

func testAccGlueCrawlerConfig_S3Target_EventQueue(rName string) string {
return testAccGlueCrawlerConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_catalog_database" "test" {
name = %[1]q
}

resource "aws_s3_bucket" "test" {
bucket = %[1]q
force_destroy = true
}

resource "aws_sqs_queue" "test" {
name = %[1]q

visibility_timeout_seconds = 3600
}

resource "aws_iam_role_policy" "test_sqs" {
role = aws_iam_role.test.name

policy = data.aws_iam_policy_document.role_test_sqs.json
}

data "aws_iam_policy_document" "role_test_sqs" {
statement {
effect = "Allow"

actions = [
"sqs:DeleteMessage",
"sqs:GetQueueUrl",
"sqs:ListDeadLetterSourceQueues",
"sqs:DeleteMessageBatch",
"sqs:ReceiveMessage",
"sqs:GetQueueAttributes",
"sqs:ListQueueTags",
"sqs:SetQueueAttributes",
"sqs:PurgeQueue",
]

resources = [
aws_sqs_queue.test.arn,
]
}
}

resource "aws_glue_crawler" "test" {
depends_on = [
aws_iam_role_policy_attachment.test-AWSGlueServiceRole,
aws_iam_role_policy.test_sqs,
]

database_name = aws_glue_catalog_database.test.name
name = %[1]q
role = aws_iam_role.test.name

s3_target {
path = "s3://${aws_s3_bucket.test.bucket}"

event_queue_arn = aws_sqs_queue.test.arn
}

recrawl_policy {
recrawl_behavior = "CRAWL_EVENT_MODE"
}
}
`, rName)
}

func testAccGlueCrawlerConfig_S3Target_DlqEventQueue(rName string) string {
return testAccGlueCrawlerConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_catalog_database" "test" {
name = %[1]q
}

resource "aws_s3_bucket" "test" {
bucket = %[1]q
force_destroy = true
}

resource "aws_sqs_queue" "test" {
name = %[1]q

visibility_timeout_seconds = 3600
}

resource "aws_sqs_queue" "test_dlq" {
name = "%[1]sdlq"

visibility_timeout_seconds = 3600
}

resource "aws_iam_role_policy" "test_sqs" {
role = aws_iam_role.test.name

policy = data.aws_iam_policy_document.role_test_sqs.json
}

data "aws_iam_policy_document" "role_test_sqs" {
statement {
effect = "Allow"

actions = [
"sqs:DeleteMessage",
"sqs:GetQueueUrl",
"sqs:ListDeadLetterSourceQueues",
"sqs:DeleteMessageBatch",
"sqs:ReceiveMessage",
"sqs:GetQueueAttributes",
"sqs:ListQueueTags",
"sqs:SetQueueAttributes",
"sqs:PurgeQueue",
]

resources = [
aws_sqs_queue.test_dlq.arn,
aws_sqs_queue.test.arn,
]
}
}

resource "aws_glue_crawler" "test" {
depends_on = [
aws_iam_role_policy_attachment.test-AWSGlueServiceRole,
aws_iam_role_policy.test_sqs,
]

database_name = aws_glue_catalog_database.test.name
name = %[1]q
role = aws_iam_role.test.name

s3_target {
path = "s3://${aws_s3_bucket.test.bucket}"

event_queue_arn = aws_sqs_queue.test.arn
dlq_event_queue_arn = aws_sqs_queue.test_dlq.arn
}

recrawl_policy {
recrawl_behavior = "CRAWL_EVENT_MODE"
}
}
`, rName)
}

func testAccGlueCrawlerConfig_S3Target_Multiple(rName, path1, path2 string) string {
return testAccGlueCrawlerConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_catalog_database" "test" {
Expand Down
4 changes: 3 additions & 1 deletion website/docs/r/glue_crawler.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ The following arguments are supported:
* `path` - (Required) The path to the Amazon S3 target.
* `connection_name` - (Optional) The name of a connection which allows crawler to access data in S3 within a VPC.
* `exclusions` - (Optional) A list of glob patterns used to exclude from the crawl.
* `sample_size` - (Optional) Sets the number of files in each leaf folder to be crawled when crawling sample files in a dataset. If not set, all the files are crawled. A valid value is an integer between 1 and 249.
* `sample_size` - (Optional) Sets the number of files in each leaf folder to be crawled when crawling sample files in a dataset. If not set, all the files are crawled. A valid value is an integer between 1 and 249.
* `event_queue_arn` - (Optional) The ARN of the SQS queue to receive S3 notifications from.
* `dlq_event_queue_arn` - (Optional) The ARN of the dead-letter SQS queue.

### Catalog Target

Expand Down