Skip to content

Commit

Permalink
feat: Kinesis sink (#84)
Browse files Browse the repository at this point in the history
* misc: remove use of deprecated ioutil package

* kafka: clean up tests

* publisher: kinesis: add stub

* publisher: kinesis: add testing scaffold

* publisher: fail integration tests on missing configuration

* publisher: pubsub: add docs for future enhancements

* publisher: kinesis: add tests for stream existence

* publisher: kinesis: add support for synchronously deleting streams

* publisher: kinesis: add tests for message production

* publisher: kinesis: use randomised partition keys

* publisher: kinesis: add tests for auto stream creation

* publisher: kinesis: add tests for stream pattern

* publisher: kinesis: cache streams

* publisher: kinesis: optimise stream checks

* ci: add integration test for kinesis

* server: integrate kinesis publisher

* publisher: kinesis: add publish timeout

* app: wrangle config for kinesis publisher

* app: add support for AWS credentials and region

* docs: add configuration for kinesis publisher

* docs: kinesis: add alternative way for specifying credentials

* publisher: kinesis: add metrics

* docs: add kinesis metrics

* metrics: whitelist metrics for kinesis

* kafka publisher: bugfix errornous metric

* publisher: pubsub: fix unknown topic metric

pubsub publisher was reporting `pubsub_unknown_topic_failure_total`
for errors that were not strictly non-existent topic errors.

* publisher: pubsub: make publish timeout per batch (vs per event)

* publisher: kinesis: disable logs in tests

* publisher: pubsub: handle replica topic creation gracefully

* publisher: kinesis: make metrics consistent with other publishers

* publisher: pubsub: simplify publish settings

* fix: docker sock fallthrough
  • Loading branch information
turtleDev authored Jul 6, 2024
1 parent 15005fd commit cfa245c
Show file tree
Hide file tree
Showing 18 changed files with 899 additions and 41 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/integration-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,15 @@ jobs:
- name: start pubsub emulator
run: docker compose up pubsub-emulator -d
- name: run integration tests
run: PUBSUB_EMULATOR_HOST=localhost:8085 go test ./publisher/pubsub -v
run: PUBSUB_EMULATOR_HOST=localhost:8085 go test ./publisher/pubsub -v
kinesis-test:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v2
- name: setup integration environment
uses: ./.github/workflows/integration-test-setup
- name: start localstack (kinesis emulator)
run: docker compose up localstack -d
- name: run integration tests
run: LOCALSTACK_HOST=http://localhost:4566 go test ./publisher/kinesis/ -v
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ install:
@go install

test: ## Run the tests
go test $(shell go list ./... | grep -v "vendor" | grep -v "integration") -v
go test $(shell go list ./... | grep -v "vendor" | grep -v "integration" | grep -v "pubsub" | grep -v "kinesis") -v

test-bench: # run benchmark tests
@go test $(shell go list ./... | grep -v "vendor") -v -bench ./... -run=^Benchmark ]
Expand All @@ -63,4 +63,4 @@ vendor: ## Update the vendor directory

docker-run:
docker compose build
docker compose up -d
docker compose up -d
29 changes: 28 additions & 1 deletion app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ import (
"syscall"
"time"

pubsubsdk "cloud.google.com/go/pubsub"

"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
"github.com/raystack/raccoon/publisher"
"github.com/raystack/raccoon/publisher/kafka"
"github.com/raystack/raccoon/publisher/kinesis"
"github.com/raystack/raccoon/publisher/pubsub"
"github.com/raystack/raccoon/services"
"github.com/raystack/raccoon/worker"

pubsubsdk "cloud.google.com/go/pubsub"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
kinesissdk "github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"google.golang.org/api/option"
)

Expand Down Expand Up @@ -140,6 +146,27 @@ func initPublisher() (Publisher, error) {
pubsub.WithByteThreshold(config.PublisherPubSub.PublishByteThreshold),
pubsub.WithTimeout(config.PublisherPubSub.PublishTimeout),
)
case "kinesis":
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion(config.PublisherKinesis.Region),
awsconfig.WithSharedConfigFiles(
[]string{config.PublisherKinesis.CredentialsFile},
),
)
if err != nil {
return nil, fmt.Errorf("error locating aws credentials: %w", err)
}
conf := config.PublisherKinesis
return kinesis.New(
kinesissdk.NewFromConfig(cfg),
kinesis.WithStreamPattern(config.EventDistribution.PublisherPattern),
kinesis.WithStreamAutocreate(conf.StreamAutoCreate),
kinesis.WithStreamMode(types.StreamMode(conf.StreamMode)),
kinesis.WithShards(conf.DefaultShards),
kinesis.WithPublishTimeout(conf.PublishTimeout),
kinesis.WithStreamProbleInterval(conf.StreamProbeInterval),
)
default:
return nil, fmt.Errorf("unknown publisher: %v", config.Publisher)
}
Expand Down
74 changes: 67 additions & 7 deletions config/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package config

import (
"bytes"
"fmt"
"os"
"path/filepath"
"strings"
"time"

Expand All @@ -14,6 +16,7 @@ import (
var Publisher string
var PublisherKafka publisherKafka
var PublisherPubSub publisherPubSub
var PublisherKinesis publisherKinesis
var dynamicKafkaClientConfigPrefix = "PUBLISHER_KAFKA_CLIENT_"

type publisherPubSub struct {
Expand All @@ -27,6 +30,16 @@ type publisherPubSub struct {
CredentialsFile string
}

type publisherKinesis struct {
Region string
CredentialsFile string
StreamAutoCreate bool
StreamProbeInterval time.Duration
StreamMode string
DefaultShards uint32
PublishTimeout time.Duration
}

type publisherKafka struct {
FlushInterval int
}
Expand Down Expand Up @@ -65,13 +78,15 @@ func publisherKafkaConfigLoader() {
}

func publisherPubSubLoader() {
envCredentialsFile := "PUBLISHER_PUBSUB_CREDENTIALS"
envTopicAutoCreate := "PUBLISHER_PUBSUB_TOPIC_AUTOCREATE"
envTopicRetentionDuration := "PUBLISHER_PUBSUB_TOPIC_RETENTION_MS"
envPublishDelayThreshold := "PUBLISHER_PUBSUB_PUBLISH_DELAY_THRESHOLD_MS"
envPublishCountThreshold := "PUBLISHER_PUBSUB_PUBLISH_COUNT_THRESHOLD"
envPublishByteThreshold := "PUBLISHER_PUBSUB_PUBLISH_BYTE_THRESHOLD"
envPublishTimeout := "PUBLISHER_PUBSUB_PUBLISH_TIMEOUT_MS"
var (
envCredentialsFile = "PUBLISHER_PUBSUB_CREDENTIALS"
envTopicAutoCreate = "PUBLISHER_PUBSUB_TOPIC_AUTOCREATE"
envTopicRetentionDuration = "PUBLISHER_PUBSUB_TOPIC_RETENTION_MS"
envPublishDelayThreshold = "PUBLISHER_PUBSUB_PUBLISH_DELAY_THRESHOLD_MS"
envPublishCountThreshold = "PUBLISHER_PUBSUB_PUBLISH_COUNT_THRESHOLD"
envPublishByteThreshold = "PUBLISHER_PUBSUB_PUBLISH_BYTE_THRESHOLD"
envPublishTimeout = "PUBLISHER_PUBSUB_PUBLISH_TIMEOUT_MS"
)

defaultCredentials := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
if strings.TrimSpace(defaultCredentials) != "" {
Expand All @@ -97,6 +112,49 @@ func publisherPubSubLoader() {
}
}

func publisherKinesisLoader() {
var (
envAWSRegion = "PUBLISHER_KINESIS_AWS_REGION"
envCredentialsFile = "PUBLISHER_KINESIS_CREDENTIALS"
envStreamProbeInterval = "PUBLISHER_KINESIS_STREAM_PROBE_INTERVAL_MS"
envStreamAutoCreate = "PUBLISHER_KINESIS_STREAM_AUTOCREATE"
envStreamMode = "PUBLISHER_KINESIS_STREAM_MODE"
envStreamDefaultShards = "PUBLISHER_KINESIS_STREAM_SHARDS"
envPublishTimeout = "PUBLISHER_KINESIS_PUBLISH_TIMEOUT_MS"
)

defaultRegion := os.Getenv("AWS_REGION")
if strings.TrimSpace(defaultRegion) != "" {
viper.SetDefault(envAWSRegion, defaultRegion)
}

home, err := os.UserHomeDir()
if err != nil {
panic(
fmt.Sprintf("unable to locate user home directory: %v", err),
)
}
defaultCredentials := filepath.Join(home, ".aws", "credentials")

viper.SetDefault(envCredentialsFile, defaultCredentials)
viper.SetDefault(envStreamProbeInterval, "1000")
viper.SetDefault(envStreamAutoCreate, "false")
viper.SetDefault(envStreamMode, "ON_DEMAND")
viper.SetDefault(envStreamDefaultShards, "4")
viper.SetDefault(envPublishTimeout, "60000")

PublisherKinesis = publisherKinesis{
Region: util.MustGetString(envAWSRegion),
CredentialsFile: util.MustGetString(envCredentialsFile),
StreamAutoCreate: util.MustGetBool(envStreamAutoCreate),
StreamProbeInterval: util.MustGetDuration(envStreamProbeInterval, time.Millisecond),
StreamMode: util.MustGetString(envStreamMode),
DefaultShards: uint32(util.MustGetInt(envStreamDefaultShards)),
PublishTimeout: util.MustGetDuration(envPublishTimeout, time.Millisecond),
}

}

func publisherConfigLoader() {

viper.SetDefault("PUBLISHER_TYPE", "kafka")
Expand All @@ -111,5 +169,7 @@ func publisherConfigLoader() {
publisherKafkaConfigLoader()
case "pubsub":
publisherPubSubLoader()
case "kinesis":
publisherKinesisLoader()
}
}
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ networks:
cs-network:

services:
localstack:
image: localstack/localstack:3.5.0
container_name: localstack-main
ports:
- "0.0.0.0:4566:4566"
# - "4510-4599"
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
pubsub-emulator:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:480.0.0
hostname: pubsub-emulator
Expand Down
58 changes: 57 additions & 1 deletion docs/docs/reference/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Publisher specific configuration follows the pattern `PUBLISHER_${TYPE}_*` where

- Type `Optional`
- Default value: `kafka`
- Possible values: `kafka`, `pubsub`
- Possible values: `kafka`, `pubsub`, `kinesis`

### `PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS`

Expand Down Expand Up @@ -313,6 +313,62 @@ How long to wait before aborting a publish operation.
- Type `Optional`
- Default value `60000` (1 Minute)

### `PUBLISHER_KINESIS_AWS_REGION`

AWS Region of the target kinesis stream. The value of `AWS_REGION` is used as fallback if this variable is not set.

- Type `Required`

### `PUBLISHER_KINESIS_CREDENTIALS`

Path to [AWS Credentials file](https://docs.aws.amazon.com/sdkref/latest/guide/file-format.html).

You can also specify the credentials using `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables.

- Type `Optional`
- Default value `$HOME/.aws/credentials`

### `PUBLISHER_KINESIS_STREAM_AUTOCREATE`

Whether Raccon should create a stream if it doesn't exist.

NOTE: We recommend that you create all streams that you need to publish to ahead of time.

- Type `Optional`
- Default value `false`

### `PUBLISHER_KINESIS_STREAM_MODE`

This configuration variable controls the `StreamMode` of the
streams created by Raccon.

- Type `Optional`
- Default value `ON_DEMAND`
- Possible values: `ON_DEMAND`, `PROVISIONED`

### `PUBLISHER_KINESIS_STREAM_SHARDS`

This controls the number of shards configured for a stream created by Raccoon.

- Type `Optional`
- Default value `4`

### `PUBLISHER_KINESIS_STREAM_PROBE_INTERVAL_MS`

This specifies the time delay between stream status checks.

- Type `Optional`
- Default value `1000`


### `PUBLISHER_KINESIS_PUBLISH_TIMEOUT_MS`

How long to wait for before aborting a publish operation.

- Type `Optional`
- Default value `60000`


## Metric

### `METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS`
Expand Down
26 changes: 26 additions & 0 deletions docs/docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Raccoon uses Statsd protocol as way to report metrics. You can capture the metri

- [Server Connection](metrics.md#server-connection)
- [Kafka Publisher](metrics.md#kafka-publisher)
- [PubSub Publisher](metrics.md#pubsub-publisher)
- [Kinesis Publisher](metrics.md#kinesis-publisher)
- [Resource Usage](metrics.md#resource-usage)
- [Event Delivery](metrics.md#event-delivery)

Expand Down Expand Up @@ -163,6 +165,30 @@ Number of delivery failure caused by topic does not exist in PubSub.
- Type: `Count`
- Tags: `topic=topicname` `event_type=*` `conn_group=*`

## Kinesis Publisher

### `kinesis_messages_delivered_total`

Number of delivered events to Kinesis. The metric also contains false increments. To find the true value, one should use the difference between this and `kinesis_messages_undelivered_total` metric for the same tag/labels.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`

### `kinesis_messages_undelivered_total`

The count of false increments done by `kinesis_messages_delivered_total`. To be used in conjunction with the former for accurate metrics.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`


### `kinesis_unknown_stream_failure_total`

Number of delivery failure caused by stream does not exist in Kinesis.

- Type: `Count`
- Tags: `stream=streamname` `event_type=*` `conn_group=*`

## Resource Usage

### `server_mem_gc_triggered_current`
Expand Down
16 changes: 16 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
github.com/aws/aws-sdk-go-v2 v1.27.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.18 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.18 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.11 // indirect
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.11 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.12 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -38,6 +53,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
Expand Down
Loading

0 comments on commit cfa245c

Please sign in to comment.