From cfa245c8cb8d9fed721dee59c78226aa50f1ac90 Mon Sep 17 00:00:00 2001 From: Saravjeet 'Aman' Singh Date: Sat, 6 Jul 2024 19:57:09 +0530 Subject: [PATCH] feat: Kinesis sink (#84) * misc: remove use of deprecated ioutil package * kafka: clean up tests * publisher: kinesis: add stub * publisher: kinesis: add testing scaffold * publisher: fail integration tests on missing configuration * publisher: pubsub: add docs for future enhancements * publisher: kinesis: add tests for stream existence * publisher: kinesis: add support for synchronously deleting streams * publisher: kinesis: add tests for message production * publisher: kinesis: use randomised partition keys * publisher: kinesis: add tests for auto stream creation * publisher: kinesis: add tests for stream pattern * publisher: kinesis: cache streams * publisher: kinesis: optimise stream checks * ci: add integration test for kinesis * server: integrate kinesis publisher * publisher: kinesis: add publish timeout * app: wrangle config for kinesis publisher * app: add support for AWS credentials and region * docs: add configuration for kinesis publisher * docs: kinesis: add alternative way for specifying credentials * publisher: kinesis: add metrics * docs: add kinesis metrics * metrics: whitelist metrics for kinesis * kafka publisher: bugfix errornous metric * publisher: pubsub: fix unknown topic metric pubsub publisher was reporting `pubsub_unknown_topic_failure_total` for errors that were not strictly non-existent topic errors. * publisher: pubsub: make publish timeout per batch (vs per event) * publisher: kinesis: disable logs in tests * publisher: pubsub: handle replica topic creation gracefully * publisher: kinesis: make metrics consistent with other publishers * publisher: pubsub: simplify publish settings * fix: docker sock fallthrough --- .github/workflows/integration-test.yaml | 13 +- Makefile | 4 +- app/server.go | 29 ++- config/publisher.go | 74 +++++- docker-compose.yml | 8 + docs/docs/reference/configurations.md | 58 ++++- docs/docs/reference/metrics.md | 26 +++ go.mod | 16 ++ go.sum | 33 +++ metrics/prometheus.go | 13 ++ metrics/prometheus_test.go | 4 +- publisher/kafka/kafka.go | 6 +- publisher/kafka/kafka_test.go | 8 +- publisher/kinesis/kinesis.go | 282 ++++++++++++++++++++++ publisher/kinesis/kinesis_test.go | 296 ++++++++++++++++++++++++ publisher/pubsub/error.go | 13 ++ publisher/pubsub/pubsub.go | 49 ++-- publisher/pubsub/pubsub_test.go | 8 +- 18 files changed, 899 insertions(+), 41 deletions(-) create mode 100644 publisher/kinesis/kinesis.go create mode 100644 publisher/kinesis/kinesis_test.go create mode 100644 publisher/pubsub/error.go diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml index e49852d0..dd930198 100644 --- a/.github/workflows/integration-test.yaml +++ b/.github/workflows/integration-test.yaml @@ -28,4 +28,15 @@ jobs: - name: start pubsub emulator run: docker compose up pubsub-emulator -d - name: run integration tests - run: PUBSUB_EMULATOR_HOST=localhost:8085 go test ./publisher/pubsub -v \ No newline at end of file + run: PUBSUB_EMULATOR_HOST=localhost:8085 go test ./publisher/pubsub -v + kinesis-test: + runs-on: ubuntu-latest + steps: + - name: Checkout repo + uses: actions/checkout@v2 + - name: setup integration environment + uses: ./.github/workflows/integration-test-setup + - name: start localstack (kinesis emulator) + run: docker compose up localstack -d + - name: run integration tests + run: LOCALSTACK_HOST=http://localhost:4566 go test ./publisher/kinesis/ -v \ No newline at end of file diff --git a/Makefile b/Makefile index edf40320..bb29175a 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ install: @go install test: ## Run the tests - go test $(shell go list ./... | grep -v "vendor" | grep -v "integration") -v + go test $(shell go list ./... | grep -v "vendor" | grep -v "integration" | grep -v "pubsub" | grep -v "kinesis") -v test-bench: # run benchmark tests @go test $(shell go list ./... | grep -v "vendor") -v -bench ./... -run=^Benchmark ] @@ -63,4 +63,4 @@ vendor: ## Update the vendor directory docker-run: docker compose build - docker compose up -d \ No newline at end of file + docker compose up -d diff --git a/app/server.go b/app/server.go index 5efaa093..eb6eaf58 100644 --- a/app/server.go +++ b/app/server.go @@ -10,16 +10,22 @@ import ( "syscall" "time" - pubsubsdk "cloud.google.com/go/pubsub" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/logger" "github.com/raystack/raccoon/metrics" "github.com/raystack/raccoon/publisher" "github.com/raystack/raccoon/publisher/kafka" + "github.com/raystack/raccoon/publisher/kinesis" "github.com/raystack/raccoon/publisher/pubsub" "github.com/raystack/raccoon/services" "github.com/raystack/raccoon/worker" + + pubsubsdk "cloud.google.com/go/pubsub" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + kinesissdk "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "google.golang.org/api/option" ) @@ -140,6 +146,27 @@ func initPublisher() (Publisher, error) { pubsub.WithByteThreshold(config.PublisherPubSub.PublishByteThreshold), pubsub.WithTimeout(config.PublisherPubSub.PublishTimeout), ) + case "kinesis": + cfg, err := awsconfig.LoadDefaultConfig( + context.Background(), + awsconfig.WithRegion(config.PublisherKinesis.Region), + awsconfig.WithSharedConfigFiles( + []string{config.PublisherKinesis.CredentialsFile}, + ), + ) + if err != nil { + return nil, fmt.Errorf("error locating aws credentials: %w", err) + } + conf := config.PublisherKinesis + return kinesis.New( + kinesissdk.NewFromConfig(cfg), + kinesis.WithStreamPattern(config.EventDistribution.PublisherPattern), + kinesis.WithStreamAutocreate(conf.StreamAutoCreate), + kinesis.WithStreamMode(types.StreamMode(conf.StreamMode)), + kinesis.WithShards(conf.DefaultShards), + kinesis.WithPublishTimeout(conf.PublishTimeout), + kinesis.WithStreamProbleInterval(conf.StreamProbeInterval), + ) default: return nil, fmt.Errorf("unknown publisher: %v", config.Publisher) } diff --git a/config/publisher.go b/config/publisher.go index 200dd93d..da80e510 100644 --- a/config/publisher.go +++ b/config/publisher.go @@ -2,7 +2,9 @@ package config import ( "bytes" + "fmt" "os" + "path/filepath" "strings" "time" @@ -14,6 +16,7 @@ import ( var Publisher string var PublisherKafka publisherKafka var PublisherPubSub publisherPubSub +var PublisherKinesis publisherKinesis var dynamicKafkaClientConfigPrefix = "PUBLISHER_KAFKA_CLIENT_" type publisherPubSub struct { @@ -27,6 +30,16 @@ type publisherPubSub struct { CredentialsFile string } +type publisherKinesis struct { + Region string + CredentialsFile string + StreamAutoCreate bool + StreamProbeInterval time.Duration + StreamMode string + DefaultShards uint32 + PublishTimeout time.Duration +} + type publisherKafka struct { FlushInterval int } @@ -65,13 +78,15 @@ func publisherKafkaConfigLoader() { } func publisherPubSubLoader() { - envCredentialsFile := "PUBLISHER_PUBSUB_CREDENTIALS" - envTopicAutoCreate := "PUBLISHER_PUBSUB_TOPIC_AUTOCREATE" - envTopicRetentionDuration := "PUBLISHER_PUBSUB_TOPIC_RETENTION_MS" - envPublishDelayThreshold := "PUBLISHER_PUBSUB_PUBLISH_DELAY_THRESHOLD_MS" - envPublishCountThreshold := "PUBLISHER_PUBSUB_PUBLISH_COUNT_THRESHOLD" - envPublishByteThreshold := "PUBLISHER_PUBSUB_PUBLISH_BYTE_THRESHOLD" - envPublishTimeout := "PUBLISHER_PUBSUB_PUBLISH_TIMEOUT_MS" + var ( + envCredentialsFile = "PUBLISHER_PUBSUB_CREDENTIALS" + envTopicAutoCreate = "PUBLISHER_PUBSUB_TOPIC_AUTOCREATE" + envTopicRetentionDuration = "PUBLISHER_PUBSUB_TOPIC_RETENTION_MS" + envPublishDelayThreshold = "PUBLISHER_PUBSUB_PUBLISH_DELAY_THRESHOLD_MS" + envPublishCountThreshold = "PUBLISHER_PUBSUB_PUBLISH_COUNT_THRESHOLD" + envPublishByteThreshold = "PUBLISHER_PUBSUB_PUBLISH_BYTE_THRESHOLD" + envPublishTimeout = "PUBLISHER_PUBSUB_PUBLISH_TIMEOUT_MS" + ) defaultCredentials := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") if strings.TrimSpace(defaultCredentials) != "" { @@ -97,6 +112,49 @@ func publisherPubSubLoader() { } } +func publisherKinesisLoader() { + var ( + envAWSRegion = "PUBLISHER_KINESIS_AWS_REGION" + envCredentialsFile = "PUBLISHER_KINESIS_CREDENTIALS" + envStreamProbeInterval = "PUBLISHER_KINESIS_STREAM_PROBE_INTERVAL_MS" + envStreamAutoCreate = "PUBLISHER_KINESIS_STREAM_AUTOCREATE" + envStreamMode = "PUBLISHER_KINESIS_STREAM_MODE" + envStreamDefaultShards = "PUBLISHER_KINESIS_STREAM_SHARDS" + envPublishTimeout = "PUBLISHER_KINESIS_PUBLISH_TIMEOUT_MS" + ) + + defaultRegion := os.Getenv("AWS_REGION") + if strings.TrimSpace(defaultRegion) != "" { + viper.SetDefault(envAWSRegion, defaultRegion) + } + + home, err := os.UserHomeDir() + if err != nil { + panic( + fmt.Sprintf("unable to locate user home directory: %v", err), + ) + } + defaultCredentials := filepath.Join(home, ".aws", "credentials") + + viper.SetDefault(envCredentialsFile, defaultCredentials) + viper.SetDefault(envStreamProbeInterval, "1000") + viper.SetDefault(envStreamAutoCreate, "false") + viper.SetDefault(envStreamMode, "ON_DEMAND") + viper.SetDefault(envStreamDefaultShards, "4") + viper.SetDefault(envPublishTimeout, "60000") + + PublisherKinesis = publisherKinesis{ + Region: util.MustGetString(envAWSRegion), + CredentialsFile: util.MustGetString(envCredentialsFile), + StreamAutoCreate: util.MustGetBool(envStreamAutoCreate), + StreamProbeInterval: util.MustGetDuration(envStreamProbeInterval, time.Millisecond), + StreamMode: util.MustGetString(envStreamMode), + DefaultShards: uint32(util.MustGetInt(envStreamDefaultShards)), + PublishTimeout: util.MustGetDuration(envPublishTimeout, time.Millisecond), + } + +} + func publisherConfigLoader() { viper.SetDefault("PUBLISHER_TYPE", "kafka") @@ -111,5 +169,7 @@ func publisherConfigLoader() { publisherKafkaConfigLoader() case "pubsub": publisherPubSubLoader() + case "kinesis": + publisherKinesisLoader() } } diff --git a/docker-compose.yml b/docker-compose.yml index e0877c2d..f07199c8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,6 +4,14 @@ networks: cs-network: services: + localstack: + image: localstack/localstack:3.5.0 + container_name: localstack-main + ports: + - "0.0.0.0:4566:4566" + # - "4510-4599" + volumes: + - "/var/run/docker.sock:/var/run/docker.sock" pubsub-emulator: image: gcr.io/google.com/cloudsdktool/google-cloud-cli:480.0.0 hostname: pubsub-emulator diff --git a/docs/docs/reference/configurations.md b/docs/docs/reference/configurations.md index eeb5c09e..1c8303b9 100644 --- a/docs/docs/reference/configurations.md +++ b/docs/docs/reference/configurations.md @@ -199,7 +199,7 @@ Publisher specific configuration follows the pattern `PUBLISHER_${TYPE}_*` where - Type `Optional` - Default value: `kafka` -- Possible values: `kafka`, `pubsub` +- Possible values: `kafka`, `pubsub`, `kinesis` ### `PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS` @@ -313,6 +313,62 @@ How long to wait before aborting a publish operation. - Type `Optional` - Default value `60000` (1 Minute) +### `PUBLISHER_KINESIS_AWS_REGION` + +AWS Region of the target kinesis stream. The value of `AWS_REGION` is used as fallback if this variable is not set. + +- Type `Required` + +### `PUBLISHER_KINESIS_CREDENTIALS` + +Path to [AWS Credentials file](https://docs.aws.amazon.com/sdkref/latest/guide/file-format.html). + +You can also specify the credentials using `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables. + +- Type `Optional` +- Default value `$HOME/.aws/credentials` + +### `PUBLISHER_KINESIS_STREAM_AUTOCREATE` + +Whether Raccon should create a stream if it doesn't exist. + +NOTE: We recommend that you create all streams that you need to publish to ahead of time. + +- Type `Optional` +- Default value `false` + +### `PUBLISHER_KINESIS_STREAM_MODE` + +This configuration variable controls the `StreamMode` of the +streams created by Raccon. + +- Type `Optional` +- Default value `ON_DEMAND` +- Possible values: `ON_DEMAND`, `PROVISIONED` + +### `PUBLISHER_KINESIS_STREAM_SHARDS` + +This controls the number of shards configured for a stream created by Raccoon. + +- Type `Optional` +- Default value `4` + +### `PUBLISHER_KINESIS_STREAM_PROBE_INTERVAL_MS` + +This specifies the time delay between stream status checks. + +- Type `Optional` +- Default value `1000` + + +### `PUBLISHER_KINESIS_PUBLISH_TIMEOUT_MS` + +How long to wait for before aborting a publish operation. + +- Type `Optional` +- Default value `60000` + + ## Metric ### `METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS` diff --git a/docs/docs/reference/metrics.md b/docs/docs/reference/metrics.md index 229665c1..450e33c5 100644 --- a/docs/docs/reference/metrics.md +++ b/docs/docs/reference/metrics.md @@ -6,6 +6,8 @@ Raccoon uses Statsd protocol as way to report metrics. You can capture the metri - [Server Connection](metrics.md#server-connection) - [Kafka Publisher](metrics.md#kafka-publisher) +- [PubSub Publisher](metrics.md#pubsub-publisher) +- [Kinesis Publisher](metrics.md#kinesis-publisher) - [Resource Usage](metrics.md#resource-usage) - [Event Delivery](metrics.md#event-delivery) @@ -163,6 +165,30 @@ Number of delivery failure caused by topic does not exist in PubSub. - Type: `Count` - Tags: `topic=topicname` `event_type=*` `conn_group=*` +## Kinesis Publisher + +### `kinesis_messages_delivered_total` + +Number of delivered events to Kinesis. The metric also contains false increments. To find the true value, one should use the difference between this and `kinesis_messages_undelivered_total` metric for the same tag/labels. + +- Type: `Count` +- Tags: `success=false` `success=true` `conn_group=*` `event_type=*` + +### `kinesis_messages_undelivered_total` + +The count of false increments done by `kinesis_messages_delivered_total`. To be used in conjunction with the former for accurate metrics. + +- Type: `Count` +- Tags: `success=false` `success=true` `conn_group=*` `event_type=*` + + +### `kinesis_unknown_stream_failure_total` + +Number of delivery failure caused by stream does not exist in Kinesis. + +- Type: `Count` +- Tags: `stream=streamname` `event_type=*` `conn_group=*` + ## Resource Usage ### `server_mem_gc_triggered_current` diff --git a/go.mod b/go.mod index bbada876..376c5054 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,21 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/iam v1.1.7 // indirect + github.com/aws/aws-sdk-go-v2 v1.27.2 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect + github.com/aws/aws-sdk-go-v2/config v1.27.18 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.18 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.11 // indirect + github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.10 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.20.11 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.28.12 // indirect + github.com/aws/smithy-go v1.20.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -38,6 +53,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect diff --git a/go.sum b/go.sum index 43411d30..a02ef98d 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,36 @@ github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/aws/aws-sdk-go-v2 v1.27.2 h1:pLsTXqX93rimAOZG2FIYraDQstZaaGVVN4tNw65v0h8= +github.com/aws/aws-sdk-go-v2 v1.27.2/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= +github.com/aws/aws-sdk-go-v2/config v1.27.18 h1:wFvAnwOKKe7QAyIxziwSKjmer9JBMH1vzIL6W+fYuKk= +github.com/aws/aws-sdk-go-v2/config v1.27.18/go.mod h1:0xz6cgdX55+kmppvPm2IaKzIXOheGJhAufacPJaXZ7c= +github.com/aws/aws-sdk-go-v2/credentials v1.17.18 h1:D/ALDWqK4JdY3OFgA2thcPO1c9aYTT5STS/CvnkqY1c= +github.com/aws/aws-sdk-go-v2/credentials v1.17.18/go.mod h1:JuitCWq+F5QGUrmMPsk945rop6bB57jdscu+Glozdnc= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.5 h1:dDgptDO9dxeFkXy+tEgVkzSClHZje/6JkPW5aZyEvrQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.5/go.mod h1:gjvE2KBUgUQhcv89jqxrIxH9GaKs1JbZzWejj/DaHGA= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 h1:cy8ahBJuhtM8GTTSyOkfy6WVPV1IE+SS5/wfXUYuulw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9/go.mod h1:CZBXGLaJnEZI6EVNcPd7a6B5IC5cA/GkRWtu9fp3S6Y= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 h1:A4SYk07ef04+vxZToz9LWvAXl9LW0NClpPpMsi31cz0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9/go.mod h1:5jJcHuwDagxN+ErjQ3PU3ocf6Ylc/p9x+BLO/+X4iXw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.11 h1:o4T+fKxA3gTMcluBNZZXE9DNaMkJuUL1O3mffCUjoJo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.11/go.mod h1:84oZdJ+VjuJKs9v1UTC9NaodRZRseOXCTgku+vQJWR8= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.10 h1:lmp5qBDoJCLsPwKrYNe6zbHnNvW5jzz/xS+H0jkoSYg= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.10/go.mod h1:CUWfw8B25XToRN7+sg092F9Ywjvz0PT4veHXBQ2KE0A= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.11 h1:gEYM2GSpr4YNWc6hCd5nod4+d4kd9vWIAWrmGuLdlMw= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.11/go.mod h1:gVvwPdPNYehHSP9Rs7q27U1EU+3Or2ZpXvzAYJNh63w= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.5 h1:iXjh3uaH3vsVcnyZX7MqCoCfcyxIrVE9iOQruRaWPrQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.5/go.mod h1:5ZXesEuy/QcO0WUnt+4sDkxhdXRHTu2yG0uCSH8B6os= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.12 h1:M/1u4HBpwLuMtjlxuI2y6HoVLzF5e2mfxHCg7ZVMYmk= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.12/go.mod h1:kcfd+eTdEi/40FIbLq4Hif3XMXnl5b/+t/KTfLt9xIk= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -208,6 +238,9 @@ github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+ github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= diff --git a/metrics/prometheus.go b/metrics/prometheus.go index 8885d6eb..5551a187 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -151,6 +151,12 @@ func getCounterMap() map[string]CounterVec { counters["pubsub_messages_undelivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "pubsub_messages_undelivered_total", Help: "Number of delivered events to PubSub which failed while reading delivery report"}, []string{"success", "conn_group", "event_type"}) + counters["kinesis_messages_delivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kinesis_messages_delivered_total", + Help: "Number of delivered events to Kafka"}, []string{"success", "conn_group", "event_type"}) + counters["kinesis_messages_undelivered_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kinesis_messages_undelivered_total", + Help: "Number of delivered events to kinesis which failed while reading delivery report"}, []string{"success", "conn_group", "event_type"}) counters["events_rx_bytes_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "events_rx_bytes_total", Help: "Total byte receieved in requests"}, []string{"conn_group", "event_type"}) @@ -163,6 +169,9 @@ func getCounterMap() map[string]CounterVec { counters["pubsub_unknown_topic_failure_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "pubsub_unknown_topic_failure_total", Help: "Number of delivery failure caused by topic does not exist in PubSub."}, []string{"topic", "event_type", "conn_group"}) + counters["kinesis_unknown_stream_failure_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kinesis_unknown_stream_failure_total", + Help: "Number of delivery failure caused by stream does not exist in kinesis."}, []string{"stream", "event_type", "conn_group"}) counters["batches_read_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "batches_read_total", Help: "Request count"}, []string{"status", "reason", "conn_group"}) @@ -267,6 +276,10 @@ func getHistogramMap() map[string]HistogramVec { Name: "pubsub_producebulk_tt_ms", Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}, }, []string{}) + histogram["kinesis_producebulk_tt_ms"] = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "kinesis_producebulk_tt_ms", + Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}, + }, []string{}) histogram["event_processing_duration_milliseconds"] = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "event_processing_duration_milliseconds", Help: "Duration from the time request is sent to the time events are published. This metric is calculated per event by following formula (PublishedTime - SentTime)/CountEvents", diff --git a/metrics/prometheus_test.go b/metrics/prometheus_test.go index ba5174cd..1ed29b6e 100644 --- a/metrics/prometheus_test.go +++ b/metrics/prometheus_test.go @@ -118,9 +118,9 @@ func (m *mockObserver) Observe(f float64) { func (promSuite *PrometheusTestSuite) Test_Prometheus_Collector_Metrics_Initialised() { // NOTE(turtledev): what are we even testing here? - numCounters := 15 + numCounters := 18 numGauge := 15 - numHistogram := 9 + numHistogram := 10 var err error promSuite.instrument, err = initPrometheusCollector() assert.NoError(promSuite.T(), err, "error while initialising prometheus collector") diff --git a/publisher/kafka/kafka.go b/publisher/kafka/kafka.go index dcf5d985..d93c14e5 100644 --- a/publisher/kafka/kafka.go +++ b/publisher/kafka/kafka.go @@ -86,14 +86,14 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string) error { } // Wait for deliveryChannel as many as processed - for i := range totalProcessed { + for range totalProcessed { d := <-deliveryChannel m := d.(*kafka.Message) if m.TopicPartition.Error != nil { - eventType := events[i].Type + order := m.Opaque.(int) + eventType := events[order].Type metrics.Increment("kafka_messages_undelivered_total", map[string]string{"success": "true", "conn_group": connGroup, "event_type": eventType}) metrics.Increment("kafka_messages_delivered_total", map[string]string{"success": "false", "conn_group": connGroup, "event_type": eventType}) - order := m.Opaque.(int) errors[order] = m.TopicPartition.Error } } diff --git a/publisher/kafka/kafka_test.go b/publisher/kafka/kafka_test.go index a5f2361d..878b09aa 100644 --- a/publisher/kafka/kafka_test.go +++ b/publisher/kafka/kafka_test.go @@ -2,6 +2,7 @@ package kafka import ( "fmt" + "io" "os" "testing" @@ -17,13 +18,8 @@ const ( group1 = "group-1" ) -type void struct{} - -func (v void) Write(_ []byte) (int, error) { - return 0, nil -} func TestMain(t *testing.M) { - logger.SetOutput(void{}) + logger.SetOutput(io.Discard) os.Exit(t.Run()) } diff --git a/publisher/kinesis/kinesis.go b/publisher/kinesis/kinesis.go new file mode 100644 index 00000000..7d0bd477 --- /dev/null +++ b/publisher/kinesis/kinesis.go @@ -0,0 +1,282 @@ +package kinesis + +import ( + "cmp" + "context" + "errors" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/raystack/raccoon/metrics" + pb "github.com/raystack/raccoon/proto" + "github.com/raystack/raccoon/publisher" +) + +var globalCtx = context.Background() + +type Publisher struct { + client *kinesis.Client + + streamLock sync.RWMutex + streams map[string]bool + streamPattern string + streamAutocreate bool + streamProbeInterval time.Duration + streamMode types.StreamMode + defaultShardCount int32 + publishTimeout time.Duration +} + +func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { + + ctx, cancel := context.WithTimeout(globalCtx, p.publishTimeout) + defer cancel() + errors := make([]error, len(events)) + + for order, event := range events { + streamName := strings.Replace(p.streamPattern, "%s", event.Type, 1) + + // only check for streams existence if publisher is configured + // to create streams. If target stream doesn't exist, then + // PutRecord will return an error anyway. + if p.streamAutocreate { + err := p.ensureStream(ctx, streamName) + if err != nil { + metrics.Increment( + "kinesis_messages_delivered_total", + map[string]string{ + "success": "false", + "conn_group": connGroup, + "event_type": event.Type, + }, + ) + if p.isErrNotFound(err) { + metrics.Increment( + "kinesis_unknown_stream_failure_total", + map[string]string{ + "stream": streamName, + "conn_group": connGroup, + "event_type": event.Type, + }, + ) + } + errors[order] = err + continue + } + } + + metrics.Increment( + "kinesis_messages_delivered_total", + map[string]string{ + "success": "true", + "conn_group": connGroup, + "event_type": event.Type, + }, + ) + + partitionKey := fmt.Sprintf("%d", rand.Int31()) + _, err := p.client.PutRecord( + ctx, + &kinesis.PutRecordInput{ + Data: event.EventBytes, + PartitionKey: aws.String(partitionKey), + StreamName: aws.String(streamName), + }, + ) + + if err != nil { + metrics.Increment( + "kinesis_messages_delivered_total", + map[string]string{ + "success": "false", + "conn_group": connGroup, + "event_type": event.Type, + }, + ) + metrics.Increment( + "kinesis_messages_undelivered_total", + map[string]string{ + "success": "true", + "conn_group": connGroup, + "event_type": event.Type, + }, + ) + if p.isErrNotFound(err) { + metrics.Increment( + "kinesis_unknown_stream_failure_total", + map[string]string{ + "stream": streamName, + "conn_group": connGroup, + "event_type": event.Type, + }, + ) + } + errors[order] = err + continue + } + } + if cmp.Or(errors...) != nil { + return &publisher.BulkError{Errors: errors} + } + return nil +} + +func (p *Publisher) ensureStream(ctx context.Context, name string) error { + + p.streamLock.RLock() + exists := p.streams[name] + p.streamLock.RUnlock() + + if exists { + return nil + } + p.streamLock.Lock() + defer p.streamLock.Unlock() + + stream, err := p.client.DescribeStreamSummary( + ctx, + &kinesis.DescribeStreamSummaryInput{ + StreamName: aws.String(name), + }, + ) + + if err != nil { + if !p.isErrNotFound(err) { + return err + } + + _, err := p.client.CreateStream( + ctx, + &kinesis.CreateStreamInput{ + StreamName: aws.String(name), + ShardCount: aws.Int32(p.defaultShardCount), + StreamModeDetails: &types.StreamModeDetails{ + StreamMode: p.streamMode, + }, + }, + ) + if err != nil { + return err + } + stream, err = p.client.DescribeStreamSummary( + ctx, + &kinesis.DescribeStreamSummaryInput{ + StreamName: aws.String(name), + }, + ) + if err != nil { + return err + } + } + + for stream.StreamDescriptionSummary.StreamStatus != types.StreamStatusActive { + time.Sleep(p.streamProbeInterval) + stream, err = p.client.DescribeStreamSummary( + ctx, + &kinesis.DescribeStreamSummaryInput{ + StreamName: aws.String(name), + }, + ) + if err != nil { + return err + } + } + + p.streams[name] = true + return nil +} + +func (*Publisher) isErrNotFound(e error) bool { + var ( + errNotFound *types.ResourceNotFoundException + isErrNotFound = errors.As(e, &errNotFound) + ) + return isErrNotFound +} + +func (*Publisher) Name() string { return "kinesis" } +func (*Publisher) Close() error { return nil } + +type Opt func(*Publisher) error + +func WithStreamAutocreate(autocreate bool) Opt { + return func(p *Publisher) error { + p.streamAutocreate = autocreate + return nil + } +} + +func WithStreamMode(mode types.StreamMode) Opt { + + validModesList := types.StreamMode("").Values() + validModes := map[types.StreamMode]bool{} + for _, m := range validModesList { + validModes[types.StreamMode(m)] = true + } + + return func(p *Publisher) error { + valid := validModes[mode] + if !valid { + return fmt.Errorf( + "unknown stream mode: %q (valid values: %v)", + mode, + validModesList, + ) + } + p.streamMode = mode + return nil + } +} + +func WithShards(n uint32) Opt { + return func(p *Publisher) error { + p.defaultShardCount = int32(n) + return nil + } +} + +func WithStreamPattern(pattern string) Opt { + return func(p *Publisher) error { + p.streamPattern = pattern + return nil + } +} + +func WithPublishTimeout(timeout time.Duration) Opt { + return func(p *Publisher) error { + p.publishTimeout = timeout + return nil + } +} + +func WithStreamProbleInterval(interval time.Duration) Opt { + return func(p *Publisher) error { + p.streamProbeInterval = interval + return nil + } +} + +func New(client *kinesis.Client, opts ...Opt) (*Publisher, error) { + p := &Publisher{ + client: client, + streamPattern: "%s", + defaultShardCount: 1, + streamProbeInterval: time.Second, + streamMode: types.StreamModeOnDemand, + streams: make(map[string]bool), + publishTimeout: time.Minute, + } + for _, opt := range opts { + err := opt(p) + if err != nil { + return nil, err + } + } + return p, nil +} diff --git a/publisher/kinesis/kinesis_test.go b/publisher/kinesis/kinesis_test.go new file mode 100644 index 00000000..98e1fd09 --- /dev/null +++ b/publisher/kinesis/kinesis_test.go @@ -0,0 +1,296 @@ +package kinesis_test + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + kinesis_sdk "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/raystack/raccoon/logger" + pb "github.com/raystack/raccoon/proto" + "github.com/raystack/raccoon/publisher/kinesis" + "github.com/stretchr/testify/require" +) + +const ( + envLocalstackHost = "LOCALSTACK_HOST" +) + +type localstackProvider struct{} + +func (p *localstackProvider) Retrieve(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: "test", + SecretAccessKey: "test", + }, nil +} + +func withLocalStack(host string) func(o *kinesis_sdk.Options) { + return func(o *kinesis_sdk.Options) { + o.BaseEndpoint = aws.String(host) + o.Credentials = &localstackProvider{} + } +} + +var ( + testEvent = &pb.Event{ + EventBytes: []byte("EVENT"), + Type: "click", + } +) + +func createStream(client *kinesis_sdk.Client, name string) (string, error) { + _, err := client.CreateStream( + context.Background(), + &kinesis_sdk.CreateStreamInput{ + StreamName: aws.String(name), + StreamModeDetails: &types.StreamModeDetails{ + StreamMode: types.StreamModeOnDemand, + }, + ShardCount: aws.Int32(1), + }, + ) + if err != nil { + return "", err + } + retries := 5 + for range retries { + stream, err := client.DescribeStreamSummary( + context.Background(), + &kinesis_sdk.DescribeStreamSummaryInput{ + StreamName: aws.String(name), + }, + ) + if err != nil { + return "", err + } + if stream.StreamDescriptionSummary.StreamStatus == types.StreamStatusActive { + return *stream.StreamDescriptionSummary.StreamARN, nil + } + time.Sleep(time.Second / 2) + } + return "", fmt.Errorf("timed out waiting for stream to get ready") +} + +func deleteStream(client *kinesis_sdk.Client, name string) error { + _, err := client.DeleteStream(context.Background(), &kinesis_sdk.DeleteStreamInput{ + StreamName: aws.String(name), + }) + if err != nil { + return err + } + + var errNotFound *types.ResourceNotFoundException + for !errors.As(err, &errNotFound) { + _, err = client.DescribeStreamSummary( + context.Background(), + &kinesis_sdk.DescribeStreamSummaryInput{ + StreamName: aws.String(name), + }, + ) + time.Sleep(time.Second / 2) + } + + return nil +} + +func getStreamMode(client *kinesis_sdk.Client, name string) (types.StreamMode, error) { + stream, err := client.DescribeStreamSummary( + context.Background(), + &kinesis_sdk.DescribeStreamSummaryInput{ + StreamName: aws.String(name), + }, + ) + if err != nil { + return "", err + } + return stream.StreamDescriptionSummary.StreamModeDetails.StreamMode, nil +} + +func readStream(client *kinesis_sdk.Client, arn string) ([][]byte, error) { + stream, err := client.DescribeStream( + context.Background(), + &kinesis_sdk.DescribeStreamInput{ + StreamARN: aws.String(arn), + }, + ) + if err != nil { + return nil, err + } + if len(stream.StreamDescription.Shards) == 0 { + return nil, fmt.Errorf("stream %q has no shards", arn) + } + iter, err := client.GetShardIterator( + context.Background(), + &kinesis_sdk.GetShardIteratorInput{ + ShardId: stream.StreamDescription.Shards[0].ShardId, + StreamARN: aws.String(arn), + ShardIteratorType: types.ShardIteratorTypeTrimHorizon, + }, + ) + if err != nil { + return nil, err + } + res, err := client.GetRecords( + context.Background(), + &kinesis_sdk.GetRecordsInput{ + StreamARN: aws.String(arn), + ShardIterator: iter.ShardIterator, + }, + ) + if err != nil { + return nil, err + } + if len(res.Records) == 0 { + return nil, fmt.Errorf("got empty response") + } + rv := [][]byte{} + for _, record := range res.Records { + rv = append(rv, record.Data) + } + return rv, nil +} + +func TestKinesisProducer(t *testing.T) { + + localstackHost := os.Getenv(envLocalstackHost) + if strings.TrimSpace(localstackHost) == "" { + t.Errorf("cannot run tests because %s env variable is not set", envLocalstackHost) + return + } + cfg, err := config.LoadDefaultConfig(context.Background()) + require.NoError(t, err, "error loading aws config") + + client := kinesis_sdk.NewFromConfig(cfg, withLocalStack(localstackHost)) + + t.Run("should return an error if stream doesn't exist", func(t *testing.T) { + pub, err := kinesis.New(client) + require.NoError(t, err) + err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.Error(t, err) + + }) + + t.Run("should return an error if an invalid stream mode is specified", func(t *testing.T) { + _, err := kinesis.New( + client, + kinesis.WithStreamMode("INVALID"), + ) + require.Error(t, err) + }) + + t.Run("should publish message to kinesis", func(t *testing.T) { + streamARN, err := createStream(client, testEvent.Type) + require.NoError(t, err) + defer deleteStream(client, testEvent.Type) + + pub, err := kinesis.New(client) + require.NoError(t, err) + pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.NoError(t, err) + events, err := readStream(client, streamARN) + require.NoError(t, err) + require.Len(t, events, 1) + require.Equal(t, events[0], testEvent.EventBytes) + }) + t.Run("stream auto creation", func(t *testing.T) { + t.Run("should create the stream if it doesn't exist and autocreate is set to true", func(t *testing.T) { + pub, err := kinesis.New(client, kinesis.WithStreamAutocreate(true)) + require.NoError(t, err) + + err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.NoError(t, err) + deleteStream(client, testEvent.Type) + }) + t.Run("should create the stream with mode = ON_DEMAND (default)", func(t *testing.T) { + + pub, err := kinesis.New(client, kinesis.WithStreamAutocreate(true)) + require.NoError(t, err) + err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.NoError(t, err) + defer deleteStream(client, testEvent.Type) + + mode, err := getStreamMode(client, testEvent.Type) + require.NoError(t, err) + require.Equal(t, mode, types.StreamModeOnDemand) + }) + t.Run("should create the stream with mode = PROVISIONED", func(t *testing.T) { + pub, err := kinesis.New( + client, + kinesis.WithStreamAutocreate(true), + kinesis.WithStreamMode(types.StreamModeProvisioned), + ) + require.NoError(t, err) + err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.NoError(t, err) + defer deleteStream(client, testEvent.Type) + + mode, err := getStreamMode(client, testEvent.Type) + require.NoError(t, err) + require.Equal(t, mode, types.StreamModeProvisioned) + }) + t.Run("should create stream with specified number of shards", func(t *testing.T) { + shards := 5 + pub, err := kinesis.New( + client, + kinesis.WithStreamAutocreate(true), + kinesis.WithShards(uint32(shards)), + ) + require.NoError(t, err) + + err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.NoError(t, err) + defer deleteStream(client, testEvent.Type) + + stream, err := client.DescribeStream( + context.Background(), + &kinesis_sdk.DescribeStreamInput{ + StreamName: aws.String(testEvent.Type), + }, + ) + require.NoError(t, err) + require.Equal(t, shards, len(stream.StreamDescription.Shards)) + }) + }) + + t.Run("should publish message according to the stream pattern", func(t *testing.T) { + streamPattern := "pre-%s-post" + destinationStream := "pre-click-post" + _, err := createStream(client, destinationStream) + require.NoError(t, err) + defer deleteStream(client, destinationStream) + pub, err := kinesis.New( + client, + kinesis.WithStreamPattern(streamPattern), + ) + require.NoError(t, err) + err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.NoError(t, err) + }) + t.Run("should publish messages to static stream names", func(t *testing.T) { + destinationStream := "static" + _, err := createStream(client, destinationStream) + require.NoError(t, err) + defer deleteStream(client, destinationStream) + pub, err := kinesis.New( + client, + kinesis.WithStreamPattern(destinationStream), + ) + require.NoError(t, err) + err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") + require.NoError(t, err) + }) +} + +func TestMain(m *testing.M) { + logger.SetOutput(io.Discard) + os.Exit(m.Run()) +} diff --git a/publisher/pubsub/error.go b/publisher/pubsub/error.go new file mode 100644 index 00000000..01dc33e3 --- /dev/null +++ b/publisher/pubsub/error.go @@ -0,0 +1,13 @@ +package pubsub + +import "fmt" + +type unknownTopicError struct { + Topic, Project string +} + +func (e *unknownTopicError) Error() string { + return fmt.Sprintf( + `topic %q doesn't exist in %q project`, e.Topic, e.Project, + ) +} diff --git a/publisher/pubsub/pubsub.go b/publisher/pubsub/pubsub.go index 5f524522..cc2a8f83 100644 --- a/publisher/pubsub/pubsub.go +++ b/publisher/pubsub/pubsub.go @@ -9,11 +9,15 @@ import ( "time" "cloud.google.com/go/pubsub" + "github.com/googleapis/gax-go/v2/apierror" "github.com/raystack/raccoon/metrics" pb "github.com/raystack/raccoon/proto" "github.com/raystack/raccoon/publisher" + "google.golang.org/grpc/codes" ) +var globalCtx = context.Background() + // Publisher publishes to a Google Cloud Platform PubSub Topic. type Publisher struct { client *pubsub.Client @@ -34,7 +38,9 @@ type Publisher struct { func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { - ctx := context.Background() + ctx, cancel := context.WithTimeout(globalCtx, p.publishSettings.Timeout) + defer cancel() + errors := make([]error, len(events)) results := make([]*pubsub.PublishResult, len(events)) @@ -51,14 +57,18 @@ func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { "event_type": event.Type, }, ) - metrics.Increment( - "pubsub_unknown_topic_failure_total", - map[string]string{ - "topic": topicId, - "conn_group": connGroup, - "event_type": event.Type, - }, - ) + _, isUnknownTopic := err.(*unknownTopicError) + if isUnknownTopic { + metrics.Increment( + "pubsub_unknown_topic_failure_total", + map[string]string{ + "topic": topicId, + "conn_group": connGroup, + "event_type": event.Type, + }, + ) + + } errors[order] = err continue } @@ -138,9 +148,7 @@ func (p *Publisher) topic(ctx context.Context, id string) (*pubsub.Topic, error) if !exists { if !p.autoCreateTopic { - return nil, fmt.Errorf( - "topic %q doesn't exist in %q project", topic, p.client.Project(), - ) + return nil, &unknownTopicError{Topic: id, Project: p.client.Project()} } cfg := &pubsub.TopicConfig{} @@ -150,11 +158,16 @@ func (p *Publisher) topic(ctx context.Context, id string) (*pubsub.Topic, error) topic, err = p.client.CreateTopicWithConfig(ctx, id, cfg) if err != nil { - return nil, fmt.Errorf("error creating topic %q: %w", id, err) + // in case a service replica created this topic before we could + if p.isAlreadyExistsError(err) { + topic = p.client.Topic(id) + } else { + return nil, fmt.Errorf("error creating topic %q: %w", id, err) + } } - topic.PublishSettings = p.publishSettings } + topic.PublishSettings = p.publishSettings p.topics[id] = topic return topic, nil } @@ -172,6 +185,14 @@ func (p *Publisher) Name() string { return "pubsub" } +func (p *Publisher) isAlreadyExistsError(e error) bool { + apiError, ok := e.(*apierror.APIError) + if !ok { + return false + } + return apiError.GRPCStatus().Code() == codes.AlreadyExists +} + type Opt func(*Publisher) func WithTopicAutocreate(autocreate bool) Opt { diff --git a/publisher/pubsub/pubsub_test.go b/publisher/pubsub/pubsub_test.go index 392d7598..6353a1c7 100644 --- a/publisher/pubsub/pubsub_test.go +++ b/publisher/pubsub/pubsub_test.go @@ -2,7 +2,7 @@ package pubsub_test import ( "context" - "io/ioutil" + "io" "os" "strings" "testing" @@ -29,8 +29,8 @@ var testEvent = &raccoonv1.Event{ func TestPubSubPublisher(t *testing.T) { host := os.Getenv(envPubsubEmulator) if strings.TrimSpace(host) == "" { - t.Logf( - "skipping pubsub tests, because %s env variable is not set", + t.Errorf( + "cannot run tests because %s env variable is not set", envPubsubEmulator, ) return @@ -182,6 +182,6 @@ func TestPubSubPublisher(t *testing.T) { } func TestMain(m *testing.M) { - logger.SetOutput(ioutil.Discard) + logger.SetOutput(io.Discard) os.Exit(m.Run()) }