From a9a3ea183694bb9f98ed158c0dd4c3a323d35d1a Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Fri, 9 Jun 2023 11:02:42 +0200 Subject: [PATCH 01/11] Test integration tests on localstack --- x-pack/filebeat/docker-compose.yml | 18 + .../input/awss3/_meta/terraform/main.tf | 41 +- x-pack/filebeat/input/awss3/input.go | 42 +- .../input_localstack_integration_test.go | 490 ++++++++++++++++++ x-pack/filebeat/input/awss3/testdata/test.xml | 2 + 5 files changed, 566 insertions(+), 27 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/input_localstack_integration_test.go create mode 100644 x-pack/filebeat/input/awss3/testdata/test.xml diff --git a/x-pack/filebeat/docker-compose.yml b/x-pack/filebeat/docker-compose.yml index 5c98c9b459c..d4733b8dd70 100644 --- a/x-pack/filebeat/docker-compose.yml +++ b/x-pack/filebeat/docker-compose.yml @@ -24,3 +24,21 @@ services: hostname: cometd ports: - 8080:8080 + + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" + image: localstack/localstack + hostname: localhost + ports: + - "4566:4566" # LocalStack Gateway + environment: + - SERVICES=sqs,sns,secretsmanager + - DEBUG=1 + - DOCKER_HOST=unix:///var/run/docker.sock + - HOST_TMP_FOLDER=${TMPDIR} + - HOSTNAME_EXTERNAL=localhost + - S3_HOSTNAME=localhost + - PROVIDER_OVERRIDE_S3=asf + volumes: + - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" + - "/var/run/docker.sock:/var/run/docker.sock" diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf index b195cb2ff6b..8105a4beee0 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf @@ -8,15 +8,38 @@ terraform { } provider "aws" { - region = var.aws_region - default_tags { - tags = { - environment = var.ENVIRONMENT - repo = var.REPO - branch = var.BRANCH - build = var.BUILD_ID - created_date = var.CREATED_DATE - } + access_key = "bharat" + secret_key = "bharat" + region = "us-east-1" + s3_use_path_style = true + skip_credentials_validation = true + skip_metadata_api_check = true + skip_requesting_account_id = true + + endpoints { + apigateway = "http://localhost:4566" + apigatewayv2 = "http://localhost:4566" + cloudformation = "http://localhost:4566" + cloudwatch = "http://localhost:4566" + dynamodb = "http://localhost:4566" + ec2 = "http://localhost:4566" + es = "http://localhost:4566" + elasticache = "http://localhost:4566" + firehose = "http://localhost:4566" + iam = "http://localhost:4566" + kinesis = "http://localhost:4566" + lambda = "http://localhost:4566" + rds = "http://localhost:4566" + redshift = "http://localhost:4566" + route53 = "http://localhost:4566" + s3 = "http://localhost:4566" + secretsmanager = "http://localhost:4566" + ses = "http://localhost:4566" + sns = "http://localhost:4566" + sqs = "http://localhost:4566" + ssm = "http://localhost:4566" + stepfunctions = "http://localhost:4566" + sts = "http://localhost:4566" } } diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 4f45f914144..2d2eea3ded5 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -189,6 +189,7 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s client: s3.NewFromConfig(in.awsConfig, func(o *s3.Options) { if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled + o.UsePathStyle = in.config.PathStyle } }), } @@ -247,29 +248,34 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } - o.UsePathStyle = in.config.PathStyle + o.UsePathStyle = true }) - regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName) - if err != nil { - return nil, fmt.Errorf("failed to get AWS region for bucket: %w", err) - } - originalAwsConfigRegion := in.awsConfig.Region + // regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName) + // if err != nil { + // return nil, fmt.Errorf("failed to get AWS region for bucket: %w", err) + // } - in.awsConfig.Region = regionName + // originalAwsConfigRegion := in.awsConfig.Region - if regionName != originalAwsConfigRegion { - s3Client = s3.NewFromConfig(in.awsConfig, func(o *s3.Options) { - if in.config.NonAWSBucketName != "" { - o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint} - } + // ctx.Logger.Errorf("**ORIGINAL*** %s", originalAwsConfigRegion) - if in.config.AWSConfig.FIPSEnabled { - o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled - } - o.UsePathStyle = in.config.PathStyle - }) - } + // in.awsConfig.Region = regionName + + // ctx.Logger.Errorf("***** %s", regionName) + + // if regionName != originalAwsConfigRegion { + // s3Client = s3.NewFromConfig(in.awsConfig, func(o *s3.Options) { + // if in.config.NonAWSBucketName != "" { + // o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint} + // } + + // if in.config.AWSConfig.FIPSEnabled { + // o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled + // } + // o.UsePathStyle = true + // }) + // } s3API := &awsS3API{ client: s3Client, diff --git a/x-pack/filebeat/input/awss3/input_localstack_integration_test.go b/x-pack/filebeat/input/awss3/input_localstack_integration_test.go new file mode 100644 index 00000000000..34507b616ce --- /dev/null +++ b/x-pack/filebeat/input/awss3/input_localstack_integration_test.go @@ -0,0 +1,490 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// See _meta/terraform/README.md for integration test usage instructions. + +//go:build integration + +package awss3 + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + "gopkg.in/yaml.v2" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + inputID = "test_id" +) + +const ( + terraformOutputYML = "_meta/terraform/outputs.yml" +) + +type terraformOutputData struct { + AWSRegion string `yaml:"aws_region"` + BucketName string `yaml:"bucket_name"` + QueueURL string `yaml:"queue_url"` + BucketNameForSNS string `yaml:"bucket_name_for_sns"` + QueueURLForSNS string `yaml:"queue_url_for_sns"` +} + +func getTerraformOutputs(t *testing.T) terraformOutputData { + t.Helper() + + _, filename, _, _ := runtime.Caller(0) + ymlData, err := ioutil.ReadFile(path.Join(path.Dir(filename), terraformOutputYML)) + if os.IsNotExist(err) { + t.Skipf("Run 'terraform apply' in %v to setup S3 and SQS for the test.", filepath.Dir(terraformOutputYML)) + } + if err != nil { + t.Fatalf("failed reading terraform output data: %v", err) + } + + var rtn terraformOutputData + dec := yaml.NewDecoder(bytes.NewReader(ymlData)) + dec.SetStrict(true) + if err = dec.Decode(&rtn); err != nil { + t.Fatal(err) + } + + return rtn +} + +func makeTestConfigS3(s3bucket string) *conf.C { + return conf.MustNewConfigFrom(fmt.Sprintf(`--- +bucket_arn: aws:s3:::%s +region: us-east-1 +number_of_workers: 2 +file_selectors: +- + regex: 'events-array.json$' + expand_event_list_from_field: Events + include_s3_metadata: + - last-modified + - x-amz-version-id + - x-amz-storage-class + - Content-Length + - Content-Type +- + regex: '\.(?:nd)?json(\.gz)?$' +- + regex: 'multiline.txt$' + parsers: + - multiline: + pattern: "^ + \ No newline at end of file From 0adb357b1f8c6b4d502851d4083b549d4fdc3c00 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 10 Jul 2023 09:17:27 +0200 Subject: [PATCH 02/11] Fix localstack integration test --- x-pack/filebeat/docker-compose.yml | 4 +- .../input/awss3/_meta/terraform/.gitignore | 2 +- .../input/awss3/_meta/terraform/localstack.tf | 90 ++++ .../input/awss3/_meta/terraform/main.tf | 41 +- .../input/awss3/_meta/terraform/outputs.tf | 10 + x-pack/filebeat/input/awss3/input.go | 44 +- .../input/awss3/input_integration_test.go | 171 +++++- .../input_localstack_integration_test.go | 490 ------------------ x-pack/filebeat/input/awss3/testdata/test.xml | 2 - 9 files changed, 278 insertions(+), 576 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf delete mode 100644 x-pack/filebeat/input/awss3/input_localstack_integration_test.go delete mode 100644 x-pack/filebeat/input/awss3/testdata/test.xml diff --git a/x-pack/filebeat/docker-compose.yml b/x-pack/filebeat/docker-compose.yml index d4733b8dd70..236a712ef07 100644 --- a/x-pack/filebeat/docker-compose.yml +++ b/x-pack/filebeat/docker-compose.yml @@ -7,6 +7,7 @@ services: depends_on: elasticsearch: { condition: service_healthy } cometd: { condition: service_healthy } + localstack: { condition: service_healthy } elasticsearch: extends: @@ -26,13 +27,12 @@ services: - 8080:8080 localstack: - container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" image: localstack/localstack hostname: localhost ports: - "4566:4566" # LocalStack Gateway environment: - - SERVICES=sqs,sns,secretsmanager + - SERVICES=s3,sqs,sns,secretsmanager - DEBUG=1 - DOCKER_HOST=unix:///var/run/docker.sock - HOST_TMP_FOLDER=${TMPDIR} diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/.gitignore b/x-pack/filebeat/input/awss3/_meta/terraform/.gitignore index 0825744a776..1af7b09a151 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/.gitignore +++ b/x-pack/filebeat/input/awss3/_meta/terraform/.gitignore @@ -1,3 +1,3 @@ terraform/ -outputs.yml +outputs*.yml *.tfstate* diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf b/x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf new file mode 100644 index 00000000000..edef6fb6770 --- /dev/null +++ b/x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf @@ -0,0 +1,90 @@ +provider "aws" { + alias = "localstack" + access_key = "bharat" + secret_key = "bharat" + region = "us-east-1" + s3_use_path_style = true + skip_credentials_validation = true + skip_metadata_api_check = true + skip_requesting_account_id = true +// s3_force_path_style = true + + endpoints { + apigateway = "http://localhost:4566" + apigatewayv2 = "http://localhost:4566" + cloudformation = "http://localhost:4566" + cloudwatch = "http://localhost:4566" + dynamodb = "http://localhost:4566" + ec2 = "http://localhost:4566" + es = "http://localhost:4566" + elasticache = "http://localhost:4566" + firehose = "http://localhost:4566" + iam = "http://localhost:4566" + kinesis = "http://localhost:4566" + lambda = "http://localhost:4566" + rds = "http://localhost:4566" + redshift = "http://localhost:4566" + route53 = "http://localhost:4566" + s3 = "http://localhost:4566" + secretsmanager = "http://localhost:4566" + ses = "http://localhost:4566" + sns = "http://localhost:4566" + sqs = "http://localhost:4566" + ssm = "http://localhost:4566" + stepfunctions = "http://localhost:4566" + sts = "http://localhost:4566" + } +} + +resource "random_string" "random_localstack" { + length = 6 + special = false + upper = false +} + +resource "aws_s3_bucket" "filebeat-integtest-localstack" { + provider = aws.localstack + bucket = "filebeat-s3-integtest-localstack-${random_string.random_localstack.result}" + force_destroy = true +} + +resource "aws_sqs_queue" "filebeat-integtest-localstack" { + provider = aws.localstack + name = "filebeat-sqs-integtest-localstack-${random_string.random_localstack.result}" + policy = < - \ No newline at end of file From 58d76dee4c5498aad39e12169714bc4d15374542 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 10 Jul 2023 16:43:33 +0200 Subject: [PATCH 03/11] Add localstack container in CI --- .ci/jobs/docker-compose.yml | 25 +++++++++++++++++++++++++ .ci/scripts/docker-services-cleanup.sh | 7 +++++++ .ci/scripts/install-docker-services.sh | 7 +++++++ Jenkinsfile | 8 ++++++++ 4 files changed, 47 insertions(+) create mode 100644 .ci/jobs/docker-compose.yml create mode 100755 .ci/scripts/docker-services-cleanup.sh create mode 100755 .ci/scripts/install-docker-services.sh diff --git a/.ci/jobs/docker-compose.yml b/.ci/jobs/docker-compose.yml new file mode 100644 index 00000000000..11f43b37a24 --- /dev/null +++ b/.ci/jobs/docker-compose.yml @@ -0,0 +1,25 @@ +version: '2.3' +services: + # This is a proxy used to block beats until all services are healthy. + # See: https://github.com/docker/compose/issues/4369 + proxy_dep: + image: busybox + depends_on: + localstack: { condition: service_healthy } + + localstack: + image: localstack/localstack + hostname: localhost + ports: + - "4566:4566" # LocalStack Gateway + environment: + - SERVICES=s3,sqs,sns,secretsmanager + - DEBUG=1 + - DOCKER_HOST=unix:///var/run/docker.sock + - HOST_TMP_FOLDER=${TMPDIR} + - HOSTNAME_EXTERNAL=localhost + - S3_HOSTNAME=localhost + - PROVIDER_OVERRIDE_S3=asf + volumes: + - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" + - "/var/run/docker.sock:/var/run/docker.sock" diff --git a/.ci/scripts/docker-services-cleanup.sh b/.ci/scripts/docker-services-cleanup.sh new file mode 100755 index 00000000000..1f718d63d19 --- /dev/null +++ b/.ci/scripts/docker-services-cleanup.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -exuo pipefail + +${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml down -v + +exit 0 diff --git a/.ci/scripts/install-docker-services.sh b/.ci/scripts/install-docker-services.sh new file mode 100755 index 00000000000..c627db1cc15 --- /dev/null +++ b/.ci/scripts/install-docker-services.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -exuo pipefail + +${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml up -d + +exit 0 diff --git a/Jenkinsfile b/Jenkinsfile index d9192e1e1ad..e0f213882fc 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -920,6 +920,8 @@ def startCloudTestEnv(Map args = [:]) { stage("${name}-prepare-cloud-env"){ withBeatsEnv(archive: false, withModule: false) { try { + // Run the docker compose file to setup the emulated cloud environment + sh(label: 'Run docker-compose services for emulated cloud env', script: ".ci/scripts/install-docker-services.sh ", returnStatus: true) dirs?.each { folder -> retryWithSleep(retries: 2, seconds: 5, backoff: true){ terraformApply(folder) @@ -930,6 +932,9 @@ def startCloudTestEnv(Map args = [:]) { // If it failed then cleanup without failing the build sh(label: 'Terraform Cleanup', script: ".ci/scripts/terraform-cleanup.sh ${folder}", returnStatus: true) } + // Cleanup the docker services + sh(label: 'Docker Compose Cleanup', script: ".ci/scripts/docker-services-cleanup.sh", returnStatus: true) + error('startCloudTestEnv: terraform apply failed.') } finally { // Archive terraform states in case manual cleanup is needed. @@ -960,6 +965,7 @@ def terraformApply(String directory) { * Tear down the terraform environments, by looking for all terraform states in directory * then it runs terraform destroy for each one. * It uses terraform states previously stashed by startCloudTestEnv. +* This also tears down any associated docker services */ def terraformCleanup(Map args = [:]) { String name = normalise(args.name) @@ -970,6 +976,8 @@ def terraformCleanup(Map args = [:]) { retryWithSleep(retries: 2, seconds: 5, backoff: true) { sh(label: "Terraform Cleanup", script: ".ci/scripts/terraform-cleanup.sh ${directory}") } + // Cleanup associated docker services + sh(label: 'Docker Compose Cleanup', script: ".ci/scripts/docker-services-cleanup.sh", returnStatus: true) } } } From c85983fac3a4b64e0a4e0bcae5e3f99194ff7959 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 17 Jul 2023 15:20:00 +0200 Subject: [PATCH 04/11] Adapt to PR 36054 --- .ci/jobs/docker-compose.yml | 23 ------------- .ci/scripts/docker-services-cleanup.sh | 7 ---- .ci/scripts/install-docker-services.sh | 7 ---- Jenkinsfile | 46 +++++++++++++------------- x-pack/filebeat/docker-compose.yml | 16 +++++++++ 5 files changed, 39 insertions(+), 60 deletions(-) delete mode 100644 .ci/jobs/docker-compose.yml delete mode 100755 .ci/scripts/docker-services-cleanup.sh delete mode 100755 .ci/scripts/install-docker-services.sh diff --git a/.ci/jobs/docker-compose.yml b/.ci/jobs/docker-compose.yml deleted file mode 100644 index 06de1acb91e..00000000000 --- a/.ci/jobs/docker-compose.yml +++ /dev/null @@ -1,23 +0,0 @@ -version: '2.3' -services: - # This is a proxy used to block beats until all services are healthy. - # See: https://github.com/docker/compose/issues/4369 - proxy_dep: - image: busybox - depends_on: - localstack: { condition: service_healthy } - - localstack: - container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" - image: localstack/localstack:2.1.0 - ports: - - "127.0.0.1:4566:4566" # LocalStack Gateway - environment: - - DEBUG=1 - - DOCKER_HOST=unix:///var/run/docker.sock - - LOCALSTACK_HOST=localhost - - S3_HOSTNAME=localhost - - PROVIDER_OVERRIDE_S3=asf - volumes: - - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" - - "/var/run/docker.sock:/var/run/docker.sock" diff --git a/.ci/scripts/docker-services-cleanup.sh b/.ci/scripts/docker-services-cleanup.sh deleted file mode 100755 index 1f718d63d19..00000000000 --- a/.ci/scripts/docker-services-cleanup.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -set -exuo pipefail - -${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml down -v - -exit 0 diff --git a/.ci/scripts/install-docker-services.sh b/.ci/scripts/install-docker-services.sh deleted file mode 100755 index c627db1cc15..00000000000 --- a/.ci/scripts/install-docker-services.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -set -exuo pipefail - -${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml up -d - -exit 0 diff --git a/Jenkinsfile b/Jenkinsfile index fc48060fdf3..29fdd5e60c4 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -309,7 +309,7 @@ def cloud(Map args = [:]) { withCloudTestEnv(args) { startCloudTestEnv(name: args.directory, dirs: args.dirs, withAWS: args.withAWS) try { - targetWithoutNode(context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id, name: args.directory) + targetWithoutNode(dirs: args.dirs, context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id) } finally { terraformCleanup(name: args.directory, dir: args.directory, withAWS: args.withAWS) } @@ -578,6 +578,7 @@ def target(Map args = [:]) { * - mage then the dir(location) is required, aka by enabling isMage: true. */ def targetWithoutNode(Map args = [:]) { + def dirs = args.get('dirs',[]) def command = args.command def context = args.context def directory = args.get('directory', '') @@ -590,23 +591,26 @@ def targetWithoutNode(Map args = [:]) { def enableRetry = args.get('enableRetry', false) def withGCP = args.get('withGCP', false) def withNodejs = args.get('withNodejs', false) - String name = normalise(args.name) + String name = normalise(args.directory) withGithubNotify(context: "${context}") { withBeatsEnv(archive: true, withModule: withModule, directory: directory, id: args.id) { dumpVariables() + // unstash terraform outputs in the same directory where the files were stashed + dirs?.each { folder -> + dir("${folder}") { + try { + unstash("terraform-${name}") + //unstash does not print verbose output , hence printing contents of the directory for logging purposes + sh "ls -la ${pwd()}" + } catch (error) { + echo "error unstashing: ${error}" + } + } + } withTools(k8s: installK8s, gcp: withGCP, nodejs: withNodejs) { // make commands use -C while mage commands require the dir(folder) // let's support this scenario with the location variable. dir(isMage ? directory : '') { - dir('input/awss3/_meta/terraform'){ - echo "terraform-${name}" - try { - unstash(name: "terraform-${name}") - sh "ls -la ${pwd()}" - } catch (error) { - echo "error unstashing: ${error}" - } - } if (enableRetry) { // Retry the same command to bypass any kind of flakiness. // Downside: genuine failures will be repeated. @@ -930,8 +934,6 @@ def startCloudTestEnv(Map args = [:]) { stage("${name}-prepare-cloud-env"){ withBeatsEnv(archive: false, withModule: false) { try { - // Run the docker compose file to setup the emulated cloud environment - sh(label: 'Run docker-compose services for emulated cloud env', script: ".ci/scripts/install-docker-services.sh ", returnStatus: true) dirs?.each { folder -> retryWithSleep(retries: 2, seconds: 5, backoff: true){ terraformApply(folder) @@ -942,17 +944,17 @@ def startCloudTestEnv(Map args = [:]) { // If it failed then cleanup without failing the build sh(label: 'Terraform Cleanup', script: ".ci/scripts/terraform-cleanup.sh ${folder}", returnStatus: true) } - // Cleanup the docker services - sh(label: 'Docker Compose Cleanup', script: ".ci/scripts/docker-services-cleanup.sh", returnStatus: true) error('startCloudTestEnv: terraform apply failed.') } finally { - // Archive terraform states in case manual cleanup is needed. - archiveArtifacts(allowEmptyArchive: true, artifacts: '**/terraform.tfstate') + dirs?.each { folder -> + // Archive terraform states in case manual cleanup is needed. + archiveArtifacts(allowEmptyArchive: true, artifacts: '**/terraform.tfstate') + dir("${folder}") { + stash(name: "terraform-${name}", allowEmpty: true, includes: '**/terraform.tfstate,**/.terraform/**,outputs*.yml') + } + } } - dir("x-pack/filebeat/input/awss3/_meta/terraform"){ - stash(name: "terraform-${name}", allowEmpty: true, includes: '**/terraform.tfstate,**/.terraform/**,*.yml') - } } } } @@ -984,12 +986,10 @@ def terraformCleanup(Map args = [:]) { String directory = args.dir stage("${name}-tear-down-cloud-env"){ withBeatsEnv(archive: false, withModule: false) { - unstash(name: "terraform-${name}") + unstash("terraform-${name}") retryWithSleep(retries: 2, seconds: 5, backoff: true) { sh(label: "Terraform Cleanup", script: ".ci/scripts/terraform-cleanup.sh ${directory}") } - // Cleanup associated docker services - sh(label: 'Docker Compose Cleanup', script: ".ci/scripts/docker-services-cleanup.sh") } } } diff --git a/x-pack/filebeat/docker-compose.yml b/x-pack/filebeat/docker-compose.yml index 5c98c9b459c..9416f67d07e 100644 --- a/x-pack/filebeat/docker-compose.yml +++ b/x-pack/filebeat/docker-compose.yml @@ -7,6 +7,7 @@ services: depends_on: elasticsearch: { condition: service_healthy } cometd: { condition: service_healthy } + localstack: { condition: service_healthy } elasticsearch: extends: @@ -24,3 +25,18 @@ services: hostname: cometd ports: - 8080:8080 + + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" + image: localstack/localstack:2.1.0 + ports: + - "127.0.0.1:4566:4566" # LocalStack Gateway + environment: + - DEBUG=1 + - DOCKER_HOST=unix:///var/run/docker.sock + - LOCALSTACK_HOST=localhost + - S3_HOSTNAME=localhost + - PROVIDER_OVERRIDE_S3=asf + volumes: + - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" + - "/var/run/docker.sock:/var/run/docker.sock" From 7abbde24d3f3fe4757ac13f13b9e6127b819757a Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 17 Jul 2023 15:48:10 +0200 Subject: [PATCH 05/11] Revert "Adapt to PR 36054" This reverts commit c85983fac3a4b64e0a4e0bcae5e3f99194ff7959. --- .ci/jobs/docker-compose.yml | 23 +++++++++++++ .ci/scripts/docker-services-cleanup.sh | 7 ++++ .ci/scripts/install-docker-services.sh | 7 ++++ Jenkinsfile | 46 +++++++++++++------------- x-pack/filebeat/docker-compose.yml | 16 --------- 5 files changed, 60 insertions(+), 39 deletions(-) create mode 100644 .ci/jobs/docker-compose.yml create mode 100755 .ci/scripts/docker-services-cleanup.sh create mode 100755 .ci/scripts/install-docker-services.sh diff --git a/.ci/jobs/docker-compose.yml b/.ci/jobs/docker-compose.yml new file mode 100644 index 00000000000..06de1acb91e --- /dev/null +++ b/.ci/jobs/docker-compose.yml @@ -0,0 +1,23 @@ +version: '2.3' +services: + # This is a proxy used to block beats until all services are healthy. + # See: https://github.com/docker/compose/issues/4369 + proxy_dep: + image: busybox + depends_on: + localstack: { condition: service_healthy } + + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" + image: localstack/localstack:2.1.0 + ports: + - "127.0.0.1:4566:4566" # LocalStack Gateway + environment: + - DEBUG=1 + - DOCKER_HOST=unix:///var/run/docker.sock + - LOCALSTACK_HOST=localhost + - S3_HOSTNAME=localhost + - PROVIDER_OVERRIDE_S3=asf + volumes: + - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" + - "/var/run/docker.sock:/var/run/docker.sock" diff --git a/.ci/scripts/docker-services-cleanup.sh b/.ci/scripts/docker-services-cleanup.sh new file mode 100755 index 00000000000..1f718d63d19 --- /dev/null +++ b/.ci/scripts/docker-services-cleanup.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -exuo pipefail + +${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml down -v + +exit 0 diff --git a/.ci/scripts/install-docker-services.sh b/.ci/scripts/install-docker-services.sh new file mode 100755 index 00000000000..c627db1cc15 --- /dev/null +++ b/.ci/scripts/install-docker-services.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -exuo pipefail + +${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml up -d + +exit 0 diff --git a/Jenkinsfile b/Jenkinsfile index 29fdd5e60c4..fc48060fdf3 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -309,7 +309,7 @@ def cloud(Map args = [:]) { withCloudTestEnv(args) { startCloudTestEnv(name: args.directory, dirs: args.dirs, withAWS: args.withAWS) try { - targetWithoutNode(dirs: args.dirs, context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id) + targetWithoutNode(context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id, name: args.directory) } finally { terraformCleanup(name: args.directory, dir: args.directory, withAWS: args.withAWS) } @@ -578,7 +578,6 @@ def target(Map args = [:]) { * - mage then the dir(location) is required, aka by enabling isMage: true. */ def targetWithoutNode(Map args = [:]) { - def dirs = args.get('dirs',[]) def command = args.command def context = args.context def directory = args.get('directory', '') @@ -591,26 +590,23 @@ def targetWithoutNode(Map args = [:]) { def enableRetry = args.get('enableRetry', false) def withGCP = args.get('withGCP', false) def withNodejs = args.get('withNodejs', false) - String name = normalise(args.directory) + String name = normalise(args.name) withGithubNotify(context: "${context}") { withBeatsEnv(archive: true, withModule: withModule, directory: directory, id: args.id) { dumpVariables() - // unstash terraform outputs in the same directory where the files were stashed - dirs?.each { folder -> - dir("${folder}") { - try { - unstash("terraform-${name}") - //unstash does not print verbose output , hence printing contents of the directory for logging purposes - sh "ls -la ${pwd()}" - } catch (error) { - echo "error unstashing: ${error}" - } - } - } withTools(k8s: installK8s, gcp: withGCP, nodejs: withNodejs) { // make commands use -C while mage commands require the dir(folder) // let's support this scenario with the location variable. dir(isMage ? directory : '') { + dir('input/awss3/_meta/terraform'){ + echo "terraform-${name}" + try { + unstash(name: "terraform-${name}") + sh "ls -la ${pwd()}" + } catch (error) { + echo "error unstashing: ${error}" + } + } if (enableRetry) { // Retry the same command to bypass any kind of flakiness. // Downside: genuine failures will be repeated. @@ -934,6 +930,8 @@ def startCloudTestEnv(Map args = [:]) { stage("${name}-prepare-cloud-env"){ withBeatsEnv(archive: false, withModule: false) { try { + // Run the docker compose file to setup the emulated cloud environment + sh(label: 'Run docker-compose services for emulated cloud env', script: ".ci/scripts/install-docker-services.sh ", returnStatus: true) dirs?.each { folder -> retryWithSleep(retries: 2, seconds: 5, backoff: true){ terraformApply(folder) @@ -944,17 +942,17 @@ def startCloudTestEnv(Map args = [:]) { // If it failed then cleanup without failing the build sh(label: 'Terraform Cleanup', script: ".ci/scripts/terraform-cleanup.sh ${folder}", returnStatus: true) } + // Cleanup the docker services + sh(label: 'Docker Compose Cleanup', script: ".ci/scripts/docker-services-cleanup.sh", returnStatus: true) error('startCloudTestEnv: terraform apply failed.') } finally { - dirs?.each { folder -> - // Archive terraform states in case manual cleanup is needed. - archiveArtifacts(allowEmptyArchive: true, artifacts: '**/terraform.tfstate') - dir("${folder}") { - stash(name: "terraform-${name}", allowEmpty: true, includes: '**/terraform.tfstate,**/.terraform/**,outputs*.yml') - } - } + // Archive terraform states in case manual cleanup is needed. + archiveArtifacts(allowEmptyArchive: true, artifacts: '**/terraform.tfstate') } + dir("x-pack/filebeat/input/awss3/_meta/terraform"){ + stash(name: "terraform-${name}", allowEmpty: true, includes: '**/terraform.tfstate,**/.terraform/**,*.yml') + } } } } @@ -986,10 +984,12 @@ def terraformCleanup(Map args = [:]) { String directory = args.dir stage("${name}-tear-down-cloud-env"){ withBeatsEnv(archive: false, withModule: false) { - unstash("terraform-${name}") + unstash(name: "terraform-${name}") retryWithSleep(retries: 2, seconds: 5, backoff: true) { sh(label: "Terraform Cleanup", script: ".ci/scripts/terraform-cleanup.sh ${directory}") } + // Cleanup associated docker services + sh(label: 'Docker Compose Cleanup', script: ".ci/scripts/docker-services-cleanup.sh") } } } diff --git a/x-pack/filebeat/docker-compose.yml b/x-pack/filebeat/docker-compose.yml index 9416f67d07e..5c98c9b459c 100644 --- a/x-pack/filebeat/docker-compose.yml +++ b/x-pack/filebeat/docker-compose.yml @@ -7,7 +7,6 @@ services: depends_on: elasticsearch: { condition: service_healthy } cometd: { condition: service_healthy } - localstack: { condition: service_healthy } elasticsearch: extends: @@ -25,18 +24,3 @@ services: hostname: cometd ports: - 8080:8080 - - localstack: - container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" - image: localstack/localstack:2.1.0 - ports: - - "127.0.0.1:4566:4566" # LocalStack Gateway - environment: - - DEBUG=1 - - DOCKER_HOST=unix:///var/run/docker.sock - - LOCALSTACK_HOST=localhost - - S3_HOSTNAME=localhost - - PROVIDER_OVERRIDE_S3=asf - volumes: - - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" - - "/var/run/docker.sock:/var/run/docker.sock" From d6e8f9fa21e79010132983ffb43c94424d2a746e Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 17 Jul 2023 15:56:42 +0200 Subject: [PATCH 06/11] Adapt Jenkinsfile --- Jenkinsfile | 38 +++++++++++-------- .../input/awss3/input_integration_test.go | 11 +++--- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index fc48060fdf3..95c0ea3d8ba 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -309,7 +309,7 @@ def cloud(Map args = [:]) { withCloudTestEnv(args) { startCloudTestEnv(name: args.directory, dirs: args.dirs, withAWS: args.withAWS) try { - targetWithoutNode(context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id, name: args.directory) + targetWithoutNode(dirs: args.dirs, context: args.context, command: args.command, directory: args.directory, label: args.label, withModule: args.withModule, isMage: true, id: args.id) } finally { terraformCleanup(name: args.directory, dir: args.directory, withAWS: args.withAWS) } @@ -578,6 +578,7 @@ def target(Map args = [:]) { * - mage then the dir(location) is required, aka by enabling isMage: true. */ def targetWithoutNode(Map args = [:]) { + def dirs = args.get('dirs',[]) def command = args.command def context = args.context def directory = args.get('directory', '') @@ -590,23 +591,26 @@ def targetWithoutNode(Map args = [:]) { def enableRetry = args.get('enableRetry', false) def withGCP = args.get('withGCP', false) def withNodejs = args.get('withNodejs', false) - String name = normalise(args.name) + String name = normalise(args.directory) withGithubNotify(context: "${context}") { withBeatsEnv(archive: true, withModule: withModule, directory: directory, id: args.id) { dumpVariables() + // unstash terraform outputs in the same directory where the files were stashed + dirs?.each { folder -> + dir("${folder}") { + try { + unstash("terraform-${name}") + //unstash does not print verbose output , hence printing contents of the directory for logging purposes + sh "ls -la ${pwd()}" + } catch (error) { + echo "error unstashing: ${error}" + } + } + } withTools(k8s: installK8s, gcp: withGCP, nodejs: withNodejs) { // make commands use -C while mage commands require the dir(folder) // let's support this scenario with the location variable. dir(isMage ? directory : '') { - dir('input/awss3/_meta/terraform'){ - echo "terraform-${name}" - try { - unstash(name: "terraform-${name}") - sh "ls -la ${pwd()}" - } catch (error) { - echo "error unstashing: ${error}" - } - } if (enableRetry) { // Retry the same command to bypass any kind of flakiness. // Downside: genuine failures will be repeated. @@ -947,12 +951,14 @@ def startCloudTestEnv(Map args = [:]) { error('startCloudTestEnv: terraform apply failed.') } finally { - // Archive terraform states in case manual cleanup is needed. - archiveArtifacts(allowEmptyArchive: true, artifacts: '**/terraform.tfstate') + dirs?.each { folder -> + // Archive terraform states in case manual cleanup is needed. + archiveArtifacts(allowEmptyArchive: true, artifacts: '**/terraform.tfstate') + dir("${folder}") { + stash(name: "terraform-${name}", allowEmpty: true, includes: '**/terraform.tfstate,**/.terraform/**,outputs*.yml') + } + } } - dir("x-pack/filebeat/input/awss3/_meta/terraform"){ - stash(name: "terraform-${name}", allowEmpty: true, includes: '**/terraform.tfstate,**/.terraform/**,*.yml') - } } } } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 6bfdf5e7a3b..758cf6d7a2a 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -119,6 +119,7 @@ func makeTestConfigSQS(queueURL string) *conf.C { queue_url: %s max_number_of_messages: 1 visibility_timeout: 30s +region: us-east-1 file_selectors: - regex: 'events-array.json$' @@ -267,7 +268,7 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 8) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), uint64(0x13)) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) + //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) } func TestInputRunSQS(t *testing.T) { @@ -314,12 +315,12 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) + //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) + //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) } func TestInputRunS3(t *testing.T) { @@ -554,10 +555,10 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) + //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) + //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) } From 2f7958b0b4067ae09e8c62336bab811b55735e0d Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Tue, 18 Jul 2023 00:12:05 +0200 Subject: [PATCH 07/11] fix panic in SQS input metrics --- .../filebeat/input/awss3/input_integration_test.go | 12 ++++++------ x-pack/filebeat/input/awss3/metrics.go | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index c03918d5710..001f726feef 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -263,12 +263,12 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - // assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 8) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), uint64(0x13)) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunSQS(t *testing.T) { @@ -315,12 +315,12 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunS3(t *testing.T) { @@ -555,10 +555,10 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index df535a2d473..0bff7d057cd 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -174,6 +174,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsMessagesWaiting: monitoring.NewInt(reg, "sqs_messages_waiting_gauge"), sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), From ca72191d9fd48eb826c52626f6bb9d2b351540b0 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Wed, 19 Jul 2023 09:05:53 +0200 Subject: [PATCH 08/11] Fix SNS test --- x-pack/filebeat/input/awss3/input.go | 4 ++-- .../filebeat/input/awss3/input_integration_test.go | 2 +- x-pack/filebeat/input/awss3/metrics.go | 13 +------------ x-pack/filebeat/input/awss3/sqs.go | 3 +++ 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index b65c8aaa7f6..83cf205b3bc 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -412,12 +412,12 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { switch apiError.ErrorCode() { case sqsAccessDeniedErrorCode: // stop polling if auth error is encountered - receiver.metrics.setSQSMessagesWaiting(int64(count)) + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) return } } - receiver.metrics.setSQSMessagesWaiting(int64(count)) + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) } } } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 001f726feef..eed45fcc95d 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -522,7 +522,7 @@ func TestInputRunSNS(t *testing.T) { drainSQS(t, tfConfig.AWSRegion, tfConfig.QueueURLForSNS, awsCfg) s3Client := s3.NewFromConfig(awsCfg) - uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName, s3Client, + uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketNameForSNS, s3Client, "testdata/events-array.json", "testdata/invalid.json", "testdata/log.json", diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 0bff7d057cd..36ca0617856 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -79,18 +79,6 @@ func (m *inputMetrics) Close() { m.unregister() } -func (m *inputMetrics) setSQSMessagesWaiting(count int64) { - if m.sqsMessagesWaiting == nil { - // if metric not initialized, and count is -1, do nothing - if count == -1 { - return - } - m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") - } - - m.sqsMessagesWaiting.Set(count) -} - // beginSQSWorker tracks the start of a new SQS worker. The returned ID // must be used to call endSQSWorker when the worker finishes. It also // increments the sqsMessagesInflight counter. @@ -187,6 +175,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"), s3ObjectProcessingTime: metrics.NewUniformSample(1024), } + out.sqsMessagesWaiting.Set(int64(-1)) adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept). diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 01ed8bfb183..2585ce0b181 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -79,6 +79,9 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Process each SQS message asynchronously with a goroutine. r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) + // Initialize the sqs_message_waiting_guage to 0 to indicate that that SQS messages are received. + // PollSqsWaitingMetric shall reassign the value if there are messages waiting + r.metrics.sqsMessagesWaiting.Set(int64(0)) workerWg.Add(len(msgs)) for _, msg := range msgs { go func(msg types.Message, start time.Time) { From c3446360506fa7e3cedc02931963a034d455771f Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Wed, 19 Jul 2023 11:22:36 +0200 Subject: [PATCH 09/11] Refactor files to fix some issues --- .ci/jobs/docker-compose.yml | 4 +-- .ci/scripts/docker-services-cleanup.sh | 2 +- .ci/scripts/install-docker-services.sh | 2 +- Jenkinsfile | 4 +-- .../input/awss3/_meta/terraform/localstack.tf | 1 - x-pack/filebeat/input/awss3/input.go | 1 - .../input/awss3/input_integration_test.go | 28 +++++++------------ x-pack/filebeat/input/awss3/sqs.go | 2 +- 8 files changed, 17 insertions(+), 27 deletions(-) diff --git a/.ci/jobs/docker-compose.yml b/.ci/jobs/docker-compose.yml index 06de1acb91e..e9fc43ff704 100644 --- a/.ci/jobs/docker-compose.yml +++ b/.ci/jobs/docker-compose.yml @@ -8,8 +8,8 @@ services: localstack: { condition: service_healthy } localstack: - container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" - image: localstack/localstack:2.1.0 + container_name: "${localstack_integration_test_container}" + image: localstack/localstack:2.1.0 # Latest stable release ports: - "127.0.0.1:4566:4566" # LocalStack Gateway environment: diff --git a/.ci/scripts/docker-services-cleanup.sh b/.ci/scripts/docker-services-cleanup.sh index 1f718d63d19..cc182413a2e 100755 --- a/.ci/scripts/docker-services-cleanup.sh +++ b/.ci/scripts/docker-services-cleanup.sh @@ -4,4 +4,4 @@ set -exuo pipefail ${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml down -v -exit 0 +exit $? diff --git a/.ci/scripts/install-docker-services.sh b/.ci/scripts/install-docker-services.sh index c627db1cc15..420362f8355 100755 --- a/.ci/scripts/install-docker-services.sh +++ b/.ci/scripts/install-docker-services.sh @@ -4,4 +4,4 @@ set -exuo pipefail ${HOME}/bin/docker-compose -f .ci/jobs/docker-compose.yml up -d -exit 0 +exit $? diff --git a/Jenkinsfile b/Jenkinsfile index 95c0ea3d8ba..fd4fa2a6e9b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -118,7 +118,7 @@ pipeline { } } steps { - runBuildAndTest(filterStage: 'extended') + runBuildAndTest(filterStage: 'mandatory') } } stage('Extended') { @@ -934,7 +934,7 @@ def startCloudTestEnv(Map args = [:]) { stage("${name}-prepare-cloud-env"){ withBeatsEnv(archive: false, withModule: false) { try { - // Run the docker compose file to setup the emulated cloud environment + // Run the docker services to setup the emulated cloud environment sh(label: 'Run docker-compose services for emulated cloud env', script: ".ci/scripts/install-docker-services.sh ", returnStatus: true) dirs?.each { folder -> retryWithSleep(retries: 2, seconds: 5, backoff: true){ diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf b/x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf index edef6fb6770..2d2fcbe09ec 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/localstack.tf @@ -7,7 +7,6 @@ provider "aws" { skip_credentials_validation = true skip_metadata_api_check = true skip_requesting_account_id = true -// s3_force_path_style = true endpoints { apigateway = "http://localhost:4566" diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 54d4877acc8..066da60e6d8 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -191,7 +191,6 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } o.UsePathStyle = in.config.PathStyle - }), } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index eed45fcc95d..a070b54547c 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -56,6 +56,7 @@ func getTerraformOutputs(t *testing.T, isLocalStack bool) terraformOutputData { t.Helper() _, filename, _, _ := runtime.Caller(0) + var outputFile string if isLocalStack { outputFile = terraformOutputLsYML @@ -63,17 +64,9 @@ func getTerraformOutputs(t *testing.T, isLocalStack bool) terraformOutputData { outputFile = terraformOutputYML } - files, err := ioutil.ReadDir(path.Join(path.Dir(filename), "_meta/terraform")) - if err != nil { - t.Fatalf("failed reading directory: %v", err) - } - for _, file := range files { - t.Logf(file.Name()) - } - ymlData, err := ioutil.ReadFile(path.Join(path.Dir(filename), outputFile)) if os.IsNotExist(err) { - t.Skipf("Run 'terraform apply' in %v to setup S3 and SQS for the test ------- %v", filepath.Dir(terraformOutputYML), path.Join(path.Dir(filename), outputFile)) + t.Skipf("Run 'terraform apply' in %v to setup S3 and SQS for the test.", filepath.Dir(outputFile)) } if err != nil { t.Fatalf("failed reading terraform output data: %v", err) @@ -197,8 +190,8 @@ func makeLocalstackConfig(awsRegion string) (aws.Config, error) { ) } -// Tests reading SQS notifcation via awss3 input when an Object is PUT into S3 -// and a notification is generated to SQS using Localstack +// Tests reading SQS notifcation via awss3 input when an object is PUT in S3 +// and a notification is generated to SQS on Localstack func TestInputRunSQSOnLocalstack(t *testing.T) { logp.TestingSetup() @@ -217,13 +210,6 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { t.Fatal(err) } - // Initialize s3Input with the test config - s3Input := &s3Input{ - config: config, - awsConfig: awsCfg, - store: openTestStatestore(), - } - // Ensure SQS is empty before testing. drainSQS(t, region, queueUrl, awsCfg) @@ -248,6 +234,12 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { cancel() }) + // Initialize s3Input with the test config + s3Input := &s3Input{ + config: config, + awsConfig: awsCfg, + store: openTestStatestore(), + } // Run S3 Input with desired context var errGroup errgroup.Group errGroup.Go(func() error { diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 2585ce0b181..47df0c45c68 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -79,7 +79,7 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Process each SQS message asynchronously with a goroutine. r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) - // Initialize the sqs_message_waiting_guage to 0 to indicate that that SQS messages are received. + // Initialize the sqs_message_waiting_gauge to 0 to indicate that that SQS messages are received. // PollSqsWaitingMetric shall reassign the value if there are messages waiting r.metrics.sqsMessagesWaiting.Set(int64(0)) workerWg.Add(len(msgs)) From 4fb2fdb1856477d34874d279976ee131dcc7d11e Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Thu, 20 Jul 2023 09:32:43 +0200 Subject: [PATCH 10/11] Add documentation --- CHANGELOG.next.asciidoc | 1 + Jenkinsfile | 2 +- .../input/awss3/_meta/terraform/README.md | 37 +++++++++++++++++++ x-pack/filebeat/input/awss3/input.go | 7 +++- 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8b01cf6c662..25fefe9533b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -350,6 +350,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add `clean_session` configuration setting for MQTT input. {pull}35806[16204] - Add fingerprint mode for the filestream scanner and new file identity based on it {issue}34419[34419] {pull}35734[35734] - Add file system metadata to events ingested via filestream {issue}35801[35801] {pull}36065[36065] +- Add support for localstack based input integration testing {pull}35727[35727] *Auditbeat* - Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817] diff --git a/Jenkinsfile b/Jenkinsfile index fd4fa2a6e9b..a0231b94957 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -990,7 +990,7 @@ def terraformCleanup(Map args = [:]) { String directory = args.dir stage("${name}-tear-down-cloud-env"){ withBeatsEnv(archive: false, withModule: false) { - unstash(name: "terraform-${name}") + unstash("terraform-${name}") retryWithSleep(retries: 2, seconds: 5, backoff: true) { sh(label: "Terraform Cleanup", script: ".ci/scripts/terraform-cleanup.sh ${directory}") } diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/README.md b/x-pack/filebeat/input/awss3/_meta/terraform/README.md index d5614b99a92..ea0e2ae5775 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/README.md +++ b/x-pack/filebeat/input/awss3/_meta/terraform/README.md @@ -4,6 +4,7 @@ This directory contains a Terraform module that creates the AWS resources needed for executing the integration tests for the `aws-s3` Filebeat input. It creates an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to be delivered to SQS. It also creates a second S3 bucket, SNS topic, SQS queue and configures S3 `ObjectCreated:*` notifications to be delivered to SNS and also creates a subscription for this SNS topic to SQS queue to automatically place messages sent to SNS topic in SQS queue. +## Cloud AWS environment It outputs configuration information that is consumed by the tests to `outputs.yml`. The AWS resources are randomly named to prevent name collisions @@ -42,4 +43,40 @@ the S3 bucket and its contents. `terraform destroy` +## Emulated cloud Localstack environment +It outputs configuration information that is consumed by the tests to +`outputs-localstack.yml`. The AWS resources are randomly named to prevent name collisions +between multiple users. + +### Usage + +You must have the appropriate Localstack environment up and running in docker. +You can use `.ci/jobs/docker-compose.yml` to spin up localstack environment. + +1. Execute terraform in this directory to create the resources. This will also +write the `outputs-localstack.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order +to match the AWS region of the profile you are using. + + `terraform apply` + + +2. (Optional) View the output configuration. + + ```yaml + "aws_region": "us-east-1" + "bucket_name": "filebeat-s3-integtest-8iok1h" + "queue_url": "https://localhost:4566/000000000000/filebeat-s3-integtest-8iok1h" + ``` + +4. Execute the integration test. + + ``` + cd x-pack/filebeat/input/awss3 + go test -tags aws,integration -run TestInputRun*Localstack* -v . + ``` + +5. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete +the S3 bucket and its contents. + + `terraform destroy` \ No newline at end of file diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 066da60e6d8..d56ab8dbccb 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -412,6 +412,10 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { t := time.NewTicker(time.Minute) defer t.Stop() + // Initialize the metric to 0 at the start of the one minute time interval to avoid + // giving misleading metric value -1 even though SQS messages are processed. + // The value will be updated every minute + receiver.metrics.sqsMessagesWaiting.Set(int64(0)) for { select { case <-ctx.Done(): @@ -424,7 +428,8 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { switch apiError.ErrorCode() { case sqsAccessDeniedErrorCode: // stop polling if auth error is encountered - receiver.metrics.sqsMessagesWaiting.Set(int64(count)) + // Set it back to -1 because there is a permission error + receiver.metrics.sqsMessagesWaiting.Set(int64(-1)) return } } From 207e80d13a80b1042e1674cd29111716febc4caf Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Tue, 25 Jul 2023 06:24:27 +0200 Subject: [PATCH 11/11] Fix PR comments --- x-pack/filebeat/input/awss3/_meta/terraform/README.md | 1 + x-pack/filebeat/input/awss3/input.go | 4 ---- x-pack/filebeat/input/awss3/input_integration_test.go | 3 +-- x-pack/filebeat/input/awss3/metrics.go | 1 + x-pack/filebeat/input/awss3/sqs.go | 3 --- 5 files changed, 3 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/README.md b/x-pack/filebeat/input/awss3/_meta/terraform/README.md index ea0e2ae5775..41100d98dad 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/README.md +++ b/x-pack/filebeat/input/awss3/_meta/terraform/README.md @@ -4,6 +4,7 @@ This directory contains a Terraform module that creates the AWS resources needed for executing the integration tests for the `aws-s3` Filebeat input. It creates an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to be delivered to SQS. It also creates a second S3 bucket, SNS topic, SQS queue and configures S3 `ObjectCreated:*` notifications to be delivered to SNS and also creates a subscription for this SNS topic to SQS queue to automatically place messages sent to SNS topic in SQS queue. + ## Cloud AWS environment It outputs configuration information that is consumed by the tests to diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index c0b16d79566..c50b4bb4c42 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -418,10 +418,6 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { t := time.NewTicker(time.Minute) defer t.Stop() - // Initialize the metric to 0 at the start of the one minute time interval to avoid - // giving misleading metric value -1 even though SQS messages are processed. - // The value will be updated every minute - receiver.metrics.sqsMessagesWaiting.Set(int64(0)) for { select { case <-ctx.Done(): diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 0fa7a0ddf8b..62cbc835011 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -213,7 +213,7 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { // Ensure SQS is empty before testing. drainSQS(t, region, queueUrl, awsCfg) - //Upload test files to S3 to generate SQS notification + // Upload test files to S3 to generate SQS notification s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { o.UsePathStyle = true }) @@ -255,7 +255,6 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 8) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), uint64(0x13)) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index b26e6ca3f56..bef57210ca6 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -175,6 +175,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"), s3ObjectProcessingTime: metrics.NewUniformSample(1024), } + // Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1). out.sqsMessagesWaiting.Set(int64(-1)) adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept). diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 9eae96366e3..dd454a3bfb9 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -79,9 +79,6 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Process each SQS message asynchronously with a goroutine. r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) - // Initialize the sqs_message_waiting_gauge to 0 to indicate that that SQS messages are received. - // PollSqsWaitingMetric shall reassign the value if there are messages waiting - r.metrics.sqsMessagesWaiting.Set(int64(0)) workerWg.Add(len(msgs)) for _, msg := range msgs {