Skip to content

Commit

Permalink
feat(autoscale): Dynamic Kinesis Scaling with Thresholds (#144)
Browse files Browse the repository at this point in the history
* feat(autoscale): Implement Dynamic Scaling

* chore(autoscale): Adjust Dynamic Scaling

* fix(terraform): Add Tags IAM Permission

* refactor(cloudwatch): Pin Datapoints to Eval Period

* docs(cloudwatch): Removed Reference to Dynamic Eval Periods
  • Loading branch information
jshlbrd authored Mar 14, 2024
1 parent d708580 commit 079fda9
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 64 deletions.
18 changes: 11 additions & 7 deletions build/terraform/aws/kinesis_data_stream/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ resource "aws_kinesis_stream" "stream" {
retention_period = var.config.retention
encryption_type = var.kms != null ? "KMS" : "NONE"
kms_key_id = var.kms != null ? var.kms.id : null
lifecycle {
ignore_changes = [shard_count]
}

tags = var.tags

lifecycle {
ignore_changes = [shard_count, tags]
}
}

# Applies the policy to each role in the access list.
Expand Down Expand Up @@ -44,14 +45,15 @@ data "aws_iam_policy_document" "access" {
statement {
effect = "Allow"
actions = [
"kinesis:AddTagsToStream",
"kinesis:DescribeStream*",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:ListTagsForStream",
"kinesis:PutRecord*",
"kinesis:SubscribeToShard",
"kinesis:SubscribeToShard",
"kinesis:RegisterStreamConsumer",
"kinesis:UpdateShardCount",
]
Expand Down Expand Up @@ -84,10 +86,11 @@ resource "aws_cloudwatch_metric_alarm" "metric_alarm_downscale" {
actions_enabled = true
alarm_actions = [var.config.autoscaling_topic]
evaluation_periods = 60
datapoints_to_alarm = 57
threshold = 0.25
datapoints_to_alarm = 60
threshold = 0.35
comparison_operator = "LessThanOrEqualToThreshold"
treat_missing_data = "ignore"

lifecycle {
ignore_changes = [metric_query, datapoints_to_alarm]
}
Expand Down Expand Up @@ -169,9 +172,10 @@ resource "aws_cloudwatch_metric_alarm" "metric_alarm_upscale" {
alarm_actions = [var.config.autoscaling_topic]
evaluation_periods = 5
datapoints_to_alarm = 5
threshold = 0.75
threshold = 0.70
comparison_operator = "GreaterThanOrEqualToThreshold"
treat_missing_data = "ignore"

lifecycle {
ignore_changes = [metric_query, datapoints_to_alarm]
}
Expand Down
16 changes: 8 additions & 8 deletions cmd/aws/lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ This app handles ingest, transform, and load for data from these AWS services:

## autoscale

This app handles Kinesis Data Stream autoscaling through SNS notifications and CloudWatch alarms. The scaling behavior is to scale up / out if stream utilization is greater than 75% of the Kinesis service limits within a 5 minute period and scale down / in if stream utilization is less than 25% of the Kinesis service limits within a 60 minute period. In both cases, streams scale by 50%.
This app handles Kinesis Data Stream autoscaling through SNS notifications and CloudWatch alarms. Scaling is based on stream capacity as determined by the number and size of incoming records written to the stream. By default, the scaling behavior follows this pattern:

Stream utilization is based on volume (i.e., 60, 000 events per minute) and size (i.e., 10GB data per minute); these values are converted to a percentage (0.0 to 1.0) and the maximum of either is considered the stream's current utilization.
* If stream utilization is greater than 70% of the Kinesis service limits consistently within a 5 minute period, then scale up
* If stream utilization is less than 35% of the Kinesis service limits consistently within a 60 minute period, then scale down

By default, streams must be above the upper threshold for all 5 minutes to scale up and below the lower threshold for at least 57 minutes to scale down. These values can be overriden by the environment variables AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS (cannot exceed 5 minutes) and AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS (cannot exceed 60 minutes).
The scaling behavior is customizable using environment variables:

For example:
* `AUTOSCALE_KINESIS_THRESHOLD` - The target threshold to cause a scaling event. The default value is 0.7 (70%), but it can be set to any value between 0.4 (40%) and 0.9 (90%). If the threshold is low, then the stream is more sensitive to scaling up and less sensitive to scaling down. If the threshold is high, then the stream is less sensitive to scaling up and more sensitive to scaling down.
* `AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS` - The number of data points required to scale up. The default value is 5, but it can be set to any value between 1 and 30. The number of data points affects the evaluation period; every 5 data points is equivalent to 5 minutes and the maximum evaluation period is 30 minutes. Use a higher value to reduce the frequency of scaling up.
* `AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS` - The number of data points required to scale down. The default value is 60, but it can be set to any value between 1 and 360. The number of data points affects the evaluation period; every 60 data points is equivalent to 1 hour and the maximum evaluation period is 6 hours. Use a higher value to reduce the frequency of scaling down.

* If a stream is configured with 10 shards and it triggers the upscale alarm, then the stream is scaled up to 15 shards
* If a stream is configured with 10 shards and it triggers the downscale alarm, then the stream is scaled down to 5 shards

Shards will not scale evenly, but the autoscaling functionality follows [AWS best practices for resharding streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html). UpdateShardCount has many limitations that the application is designed around, but there may be times when these limits cannot be avoided; if any limits are met, then users should file a service limit increase with AWS. Although rare, the most common service limits that users may experience are:
Shards do not scale evenly, but the autoscaling follows [AWS best practices for resharding streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html). UpdateShardCount has many limitations that the application is designed around, but there may be times when these limits cannot be avoided; if any limits are met, then users should file a service limit increase with AWS. Although rare, the most common service limits that users may experience are:

* Scaling a stream more than 10 times per 24 hour rolling period
* Scaling a stream beyond 10000 shards
Expand Down
81 changes: 62 additions & 19 deletions cmd/aws/lambda/autoscale/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"strconv"
"strings"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
Expand All @@ -16,10 +17,6 @@ import (
"github.com/tidwall/gjson"
)

const (
autoscalePercentage = 50.0
)

var (
cloudwatchAPI cloudwatch.API
kinesisAPI kinesis.API
Expand All @@ -43,7 +40,7 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error {
alarmName := gjson.Get(message, "AlarmName").String()
triggerMetrics := gjson.Get(message, "Trigger.Metrics")

log.WithField("alarm", alarmName).Info("received autoscale notification")
log.WithField("alarm", alarmName).Debug("Received autoscale notification.")

var stream string
for _, v := range triggerMetrics.Array() {
Expand All @@ -53,23 +50,25 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error {
break
}
}
log.WithField("alarm", alarmName).WithField("stream", stream).Info("parsed Kinesis stream")
log.WithField("alarm", alarmName).WithField("stream", stream).Debug("Parsed Kinesis stream.")

shards, err := kinesisAPI.ActiveShards(ctx, stream)
if err != nil {
return fmt.Errorf("handler: %v", err)
}
log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", shards).
Info("retrieved active shard count")
Info("Retrieved active shard count.")

var newShards int64
if strings.Contains(alarmName, "upscale") {
newShards = upscale(float64(shards), autoscalePercentage)
newShards = upscale(float64(shards))
}
if strings.Contains(alarmName, "downscale") {
newShards = downscale(float64(shards), autoscalePercentage)
newShards = downscale(float64(shards))
}

log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", newShards).Info("Calculated new shard count.")

tags, err := kinesisAPI.GetTags(ctx, stream)
if err != nil {
return fmt.Errorf("handler: %v", err)
Expand All @@ -83,7 +82,7 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error {
return fmt.Errorf("handler: %v", err)
}

log.WithField("stream", stream).WithField("count", minShard).Info("retrieved minimum shard count")
log.WithField("stream", stream).WithField("count", minShard).Debug("Retrieved minimum shard count.")
}

if *tag.Key == "MaximumShards" {
Expand All @@ -92,7 +91,28 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error {
return fmt.Errorf("handler: %v", err)
}

log.WithField("stream", stream).WithField("count", maxShard).Info("retrieved maximum shard count")
log.WithField("stream", stream).WithField("count", maxShard).Debug("Retrieved maximum shard count.")
}

// Tracking the last scaling event prevents scaling from occurring too frequently.
// If the current scaling event is an upscale, then the last scaling event must be at least 3 minutes ago.
// If the current scaling event is a downscale, then the last scaling event must be at least 30 minutes ago.
if *tag.Key == "LastScalingEvent" {
lastScalingEvent, err := time.Parse(time.RFC3339, *tag.Value)
if err != nil {
return fmt.Errorf("handler: %v", err)
}

if (time.Since(lastScalingEvent) < 3*time.Minute && strings.Contains(alarmName, "upscale")) ||
(time.Since(lastScalingEvent) < 30*time.Minute && strings.Contains(alarmName, "downscale")) {
log.WithField("stream", stream).WithField("time", lastScalingEvent).Info("Last scaling event is too recent.")

if err := cloudwatchAPI.UpdateKinesisAlarmState(ctx, alarmName, "Last scaling event is too recent"); err != nil {
return fmt.Errorf("handler: %v", err)
}

return nil
}
}
}

Expand All @@ -109,32 +129,55 @@ func handler(ctx context.Context, snsEvent events.SNSEvent) error {
}

if newShards == shards {
log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", shards).Info("active shard count is at minimum threshold, no updates necessary")
log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", shards).Info("Active shard count is at minimum threshold, no change is required.")
return nil
}

if err := kinesisAPI.UpdateShards(ctx, stream, newShards); err != nil {
return fmt.Errorf("handler: %v", err)
}
log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", newShards).Info("updated shards")

if err := kinesisAPI.UpdateTag(ctx, stream, "LastScalingEvent", time.Now().Format(time.RFC3339)); err != nil {
return fmt.Errorf("handler: %v", err)
}

log.WithField("alarm", alarmName).WithField("stream", stream).WithField("count", newShards).Info("Updated shard count.")

if err := cloudwatchAPI.UpdateKinesisDownscaleAlarm(ctx, stream+"_downscale", stream, topicArn, newShards); err != nil {
return fmt.Errorf("handler: %v", err)
}
log.WithField("alarm", stream+"_downscale").WithField("stream", stream).WithField("count", newShards).Info("reset alarm")
log.WithField("alarm", stream+"_downscale").WithField("stream", stream).WithField("count", newShards).Debug("Reset CloudWatch alarm.")

if err := cloudwatchAPI.UpdateKinesisUpscaleAlarm(ctx, stream+"_upscale", stream, topicArn, newShards); err != nil {
return fmt.Errorf("handler: %v", err)
}
log.WithField("alarm", stream+"_upscale").WithField("stream", stream).WithField("count", newShards).Info("reset alarm")
log.WithField("alarm", stream+"_upscale").WithField("stream", stream).WithField("count", newShards).Debug("Reset CloudWatch alarm.")

return nil
}

func downscale(shards, pct float64) int64 {
return int64(math.Ceil(shards - (shards * (pct / 100))))
func downscale(shards float64) int64 {
switch {
case shards < 5:
return int64(math.Ceil(shards / 2))
case shards < 13:
return int64(math.Ceil(shards / 1.75))
case shards < 33:
return int64(math.Ceil(shards / 1.5))
default:
return int64(math.Ceil(shards / 1.25))
}
}

func upscale(shards, pct float64) int64 {
return int64(math.Ceil(shards + (shards * (pct / 100))))
func upscale(shards float64) int64 {
switch {
case shards < 5:
return int64(math.Floor(shards * 2))
case shards < 13:
return int64(math.Floor(shards * 1.75))
case shards < 33:
return int64(math.Floor(shards * 1.5))
default:
return int64(math.Floor(shards * 1.25))
}
}
78 changes: 48 additions & 30 deletions internal/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,54 @@ import (
)

const (
// This is the period in seconds that the AWS Kinesis CloudWatch alarms
// will evaluate the metrics over.
kinesisMetricsPeriod = 60
// AWS Kinesis streams will scale down / in if they are less than 25% of the Kinesis service limits within a 60 minute / 1 hour period.
kinesisDownscaleEvaluationPeriod, kinesisDownscaleThreshold = 60, 0.25
// AWS Kinesis streams will scale up / out if they are greater than 75% of the Kinesis service limits within a 5 minute period.
kinesisUpscaleEvaluationPeriod, kinesisUpscaleThreshold = 5, 0.75
)

var (
// By default, AWS Kinesis streams must be below the lower threshold for 95% of the evaluation period (57 minutes) to scale down. This value can be overridden by the environment variable AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS, but it cannot exceed 60 minutes.
kinesisDownscaleDatapoints = 57
// By default, AWS Kinesis streams must be above the upper threshold for 100% of the evaluation period (5 minutes) to scale up. This value can be overridden by the environment variable AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS, but it cannot exceed 5 minutes.
// By default, AWS Kinesis streams must be below the lower threshold for
// 100% of the evaluation period (60 minutes) to scale down. This value can
// be overridden by the environment variable AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS.
kinesisDownscaleDatapoints = 60
// By default, AWS Kinesis streams must be above the upper threshold for
// 100% of the evaluation period (5 minutes) to scale up. This value can
// be overridden by the environment variable AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS.
kinesisUpscaleDatapoints = 5
// By default, AWS Kinesis streams will scale up if the incoming records and bytes
// are above 70% of the threshold. This value can be overridden by the environment
// variable AUTOSCALE_KINESIS_THRESHOLD, but it cannot be less than 40% or greater
// than 90%.
kinesisThreshold = 0.7
)

func init() {
if v, found := os.LookupEnv("AUTOSCALE_KINESIS_DOWNSCALE_DATAPOINTS"); found {
downscale, err := strconv.Atoi(v)
dps, err := strconv.Atoi(v)
if err != nil {
panic(err)
}

if downscale <= kinesisDownscaleEvaluationPeriod {
kinesisDownscaleDatapoints = downscale
}
kinesisDownscaleDatapoints = dps
}

if v, found := os.LookupEnv("AUTOSCALE_KINESIS_UPSCALE_DATAPOINTS"); found {
upscale, err := strconv.Atoi(v)
dps, err := strconv.Atoi(v)
if err != nil {
panic(err)
}

if upscale <= kinesisUpscaleEvaluationPeriod {
kinesisUpscaleDatapoints = upscale
kinesisUpscaleDatapoints = dps
}

if v, found := os.LookupEnv("AUTOSCALE_KINESIS_THRESHOLD"); found {
threshold, err := strconv.ParseFloat(v, 64)
if err != nil {
panic(err)
}

if threshold >= 0.4 && threshold <= 0.9 {
kinesisThreshold = threshold
}
}
}
Expand Down Expand Up @@ -80,16 +94,18 @@ func (a *API) IsEnabled() bool {

// UpdateKinesisDownscaleAlarm updates CloudWatch alarms that manage the scale down tracking for Kinesis streams.
func (a *API) UpdateKinesisDownscaleAlarm(ctx aws.Context, name, stream, topic string, shards int64) error {
downscaleThreshold := kinesisThreshold - 0.35

if _, err := a.Client.PutMetricAlarmWithContext(
ctx,
&cloudwatch.PutMetricAlarmInput{
AlarmName: aws.String(name),
AlarmDescription: aws.String(stream),
ActionsEnabled: aws.Bool(true),
AlarmActions: []*string{aws.String(topic)},
EvaluationPeriods: aws.Int64(kinesisDownscaleEvaluationPeriod),
EvaluationPeriods: aws.Int64(int64(kinesisDownscaleDatapoints)),
DatapointsToAlarm: aws.Int64(int64(kinesisDownscaleDatapoints)),
Threshold: aws.Float64(kinesisDownscaleThreshold),
Threshold: aws.Float64(downscaleThreshold),
ComparisonOperator: aws.String("LessThanOrEqualToThreshold"),
TreatMissingData: aws.String("ignore"),
Metrics: []*cloudwatch.MetricDataQuery{
Expand Down Expand Up @@ -170,12 +186,7 @@ func (a *API) UpdateKinesisDownscaleAlarm(ctx aws.Context, name, stream, topic s
return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err)
}

if _, err := a.Client.SetAlarmStateWithContext(ctx,
&cloudwatch.SetAlarmStateInput{
AlarmName: aws.String(name),
StateValue: aws.String("INSUFFICIENT_DATA"),
StateReason: aws.String("Threshold value updated"),
}); err != nil {
if err := a.UpdateKinesisAlarmState(ctx, name, "Threshold value updated"); err != nil {
return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err)
}

Expand All @@ -184,16 +195,18 @@ func (a *API) UpdateKinesisDownscaleAlarm(ctx aws.Context, name, stream, topic s

// UpdateKinesisUpscaleAlarm updates CloudWatch alarms that manage the scale up tracking for Kinesis streams.
func (a *API) UpdateKinesisUpscaleAlarm(ctx aws.Context, name, stream, topic string, shards int64) error {
upscaleThreshold := kinesisThreshold

if _, err := a.Client.PutMetricAlarmWithContext(
ctx,
&cloudwatch.PutMetricAlarmInput{
AlarmName: aws.String(name),
AlarmDescription: aws.String(stream),
ActionsEnabled: aws.Bool(true),
AlarmActions: []*string{aws.String(topic)},
EvaluationPeriods: aws.Int64(kinesisUpscaleEvaluationPeriod),
EvaluationPeriods: aws.Int64(int64(kinesisUpscaleDatapoints)),
DatapointsToAlarm: aws.Int64(int64(kinesisUpscaleDatapoints)),
Threshold: aws.Float64(kinesisUpscaleThreshold),
Threshold: aws.Float64(upscaleThreshold),
ComparisonOperator: aws.String("GreaterThanOrEqualToThreshold"),
TreatMissingData: aws.String("ignore"),
Metrics: []*cloudwatch.MetricDataQuery{
Expand Down Expand Up @@ -274,14 +287,19 @@ func (a *API) UpdateKinesisUpscaleAlarm(ctx aws.Context, name, stream, topic str
return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err)
}

if _, err := a.Client.SetAlarmStateWithContext(ctx,
&cloudwatch.SetAlarmStateInput{
AlarmName: aws.String(name),
StateValue: aws.String("INSUFFICIENT_DATA"),
StateReason: aws.String("Threshold value updated"),
}); err != nil {
if err := a.UpdateKinesisAlarmState(ctx, name, "Threshold value updated"); err != nil {
return fmt.Errorf("updatealarm alarm %s stream %s: %v", name, stream, err)
}

return nil
}

func (a *API) UpdateKinesisAlarmState(ctx aws.Context, name, reason string) error {
_, err := a.Client.SetAlarmStateWithContext(ctx,
&cloudwatch.SetAlarmStateInput{
AlarmName: aws.String(name),
StateValue: aws.String("INSUFFICIENT_DATA"),
StateReason: aws.String(reason),
})
return err
}
Loading

0 comments on commit 079fda9

Please sign in to comment.