Skip to content

Commit

Permalink
feat: old reset strategy (#72)
Browse files Browse the repository at this point in the history
* feat: use group incremental reset strategy

* test: getNewConsumerGroupID

* fix: lint issue

* fix: lint issue

* fix: use regex

* fix: lint issue

* test: fix group id numbers

* fix: use shorted if syntax

* fix: test

* fix: readConfig

* refactor: rename new-reset to reset-v2

* fix: error message

* fix: var name

* fix: var name
  • Loading branch information
ishanarya0 authored Sep 20, 2023
1 parent ee9a105 commit 2c4ef1f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 23 deletions.
4 changes: 3 additions & 1 deletion modules/firehose/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ func readConfig(r resource.Resource, confJSON json.RawMessage, dc driverConf) (*
return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars")
}

// we name a consumer group by adding a sequence suffix to the deployment name
// this sequence will later be incremented to name new consumer group while resetting offset
if consumerID := cfg.EnvVariables[confKeyConsumerID]; consumerID == "" {
cfg.EnvVariables[confKeyConsumerID] = fmt.Sprintf("%s-0001", cfg.DeploymentID)
cfg.EnvVariables[confKeyConsumerID] = fmt.Sprintf("%s-1", cfg.DeploymentID)
}

rl := dc.RequestsAndLimits[defaultKey]
Expand Down
67 changes: 65 additions & 2 deletions modules/firehose/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@ package firehose
import (
"context"
"encoding/json"
"fmt"
"regexp"
"strconv"

"github.com/goto/entropy/core/module"
"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/pkg/errors"
"github.com/goto/entropy/pkg/kafka"
)

const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"

var suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`)
var errGroupIDFormat = fmt.Errorf("group id must match the format '%s'", suffixRegex)

func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
switch act.Name {
case module.CreateAction:
Expand All @@ -18,6 +26,9 @@ func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, a
case ResetAction:
return fd.planReset(exr, act)

case ResetV2Action:
return fd.planResetV2(exr, act)

default:
return fd.planChange(exr, act)
}
Expand Down Expand Up @@ -128,8 +139,8 @@ func (fd *firehoseDriver) planCreate(exr module.ExpandedResource, act module.Act
return &exr.Resource, nil
}

func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
resetValue, err := kafka.ParseResetParams(act.Params)
func (fd *firehoseDriver) planResetV2(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
resetValue, err := kafka.ParseResetV2Params(act.Params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,3 +170,55 @@ func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.Acti
}
return &exr.Resource, nil
}

func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
resetValue, err := kafka.ParseResetParams(act.Params)
if err != nil {
return nil, err
}

immediately := fd.timeNow()

curConf, err := readConfig(exr.Resource, exr.Resource.Spec.Configs, fd.conf)
if err != nil {
return nil, err
}

curConf.ResetOffset = resetValue
curConf.EnvVariables[SourceKafkaConsumerAutoOffsetReset] = resetValue
curConf.EnvVariables[confKeyConsumerID], err = getNewConsumerGroupID(curConf.EnvVariables[confKeyConsumerID])
if err != nil {
return nil, err
}

exr.Resource.Spec.Configs = mustJSON(curConf)
exr.Resource.State = resource.State{
Status: resource.StatusPending,
Output: exr.Resource.State.Output,
NextSyncAt: &immediately,
ModuleData: mustJSON(transientData{
PendingSteps: []string{
stepReleaseStop, // stop the firehose
stepReleaseUpdate, // restart the deployment.
},
}),
}
return &exr.Resource, nil
}

func getNewConsumerGroupID(curGroup string) (string, error) {
matches := suffixRegex.FindStringSubmatch(curGroup)
if expLen := 3; len(matches) != expLen {
return "", errGroupIDFormat
}
prefix, sequence := matches[1], matches[2]

seq, err := strconv.Atoi(sequence)
if err != nil {
return "", errors.Errorf("error converting group sequence %s to int: %v", sequence, err)
} else {
seq++
}

return fmt.Sprintf("%s-%d", prefix, seq), nil
}
72 changes: 56 additions & 16 deletions modules/firehose/driver_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestFirehoseDriver_Plan(t *testing.T) {
"env_variables": map[string]string{
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-3801d0-firehose-0001",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-3801d0-firehose-1",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
},
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestFirehoseDriver_Plan(t *testing.T) {
act: module.ActionRequest{
Name: ResetAction,
Params: mustJSON(map[string]any{
"reset_to": "some_random",
"to": "some_random",
}),
},
wantErr: errors.ErrInvalid,
Expand All @@ -324,11 +324,12 @@ func TestFirehoseDriver_Plan(t *testing.T) {
"replicas": 1,
"deployment_id": "firehose-deployment-x",
"env_variables": map[string]string{
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-deployment-x-1",
"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
},
"limits": map[string]any{
"cpu": "200m",
Expand All @@ -352,7 +353,7 @@ func TestFirehoseDriver_Plan(t *testing.T) {
act: module.ActionRequest{
Name: ResetAction,
Params: mustJSON(map[string]any{
"to": "latest",
"to": "earliest",
}),
},
want: &resource.Resource{
Expand All @@ -365,13 +366,14 @@ func TestFirehoseDriver_Plan(t *testing.T) {
"replicas": 1,
"deployment_id": "firehose-deployment-x",
"env_variables": map[string]string{
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-deployment-x-2",
"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "earliest",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
},
"reset_offset": "latest",
"reset_offset": "earliest",
"limits": map[string]any{
"cpu": "200m",
"memory": "512Mi",
Expand All @@ -391,10 +393,8 @@ func TestFirehoseDriver_Plan(t *testing.T) {
ReleaseName: "bar",
}),
ModuleData: mustJSON(transientData{
ResetOffsetTo: "latest",
PendingSteps: []string{
stepReleaseStop,
stepKafkaReset,
stepReleaseUpdate,
},
}),
Expand Down Expand Up @@ -558,3 +558,43 @@ func TestFirehoseDriver_Plan(t *testing.T) {
})
}
}

func TestGetNewConsumerGroupID(t *testing.T) {
t.Parallel()

table := []struct {
title string
deploymentID string
consumerGroupID string
want string
wantErr error
}{
{
title: "invalid-group-id",
consumerGroupID: "test-firehose-xyz",
want: "",
wantErr: errGroupIDFormat,
},
{
title: "valid-group-id",
consumerGroupID: "test-firehose-0999",
want: "test-firehose-1000",
wantErr: nil,
},
}

for _, tt := range table {
t.Run(tt.title, func(t *testing.T) {
got, err := getNewConsumerGroupID(tt.consumerGroupID)
if tt.wantErr != nil {
require.Error(t, err)
assert.Equal(t, "", got)
assert.ErrorIs(t, err, tt.wantErr)
} else {
assert.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, tt.want, got)
}
})
}
}
1 change: 1 addition & 0 deletions modules/firehose/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
StartAction = "start"
StopAction = "stop"
ResetAction = "reset"
ResetV2Action = "reset-v2"
UpgradeAction = "upgrade"
)

Expand Down
31 changes: 27 additions & 4 deletions pkg/kafka/consumer_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ const (
resetDatetime = "datetime"
)

type ResetParams struct {
type ResetV2Params struct {
To string `json:"to"`
Datetime string `json:"datetime"`
}

type ResetParams struct {
To string `json:"to"`
}

// DoReset executes a kubernetes job with kafka-consumer-group.sh installed to
// reset offset policy for the given consumer id on all topics.
func DoReset(ctx context.Context, jobCluster *kube.Client, kubeNamespace, kafkaBrokers, kafkaConsumerID, kafkaResetValue, resetJobName string) error {
Expand All @@ -40,10 +44,10 @@ func DoReset(ctx context.Context, jobCluster *kube.Client, kubeNamespace, kafkaB
)
}

// ParseResetParams parses the given JSON data as reset parameters value and
// ParseResetV2Params parses the given JSON data as reset parameters value and
// returns the actual reset value to be used with DoReset().
func ParseResetParams(bytes json.RawMessage) (string, error) {
var params ResetParams
func ParseResetV2Params(bytes json.RawMessage) (string, error) {
var params ResetV2Params
if err := json.Unmarshal(bytes, &params); err != nil {
return "", errors.ErrInvalid.
WithMsgf("invalid reset params").
Expand All @@ -61,6 +65,25 @@ func ParseResetParams(bytes json.RawMessage) (string, error) {
return resetValue, nil
}

// ParseResetParams parses the given JSON data as reset parameters value and
// returns the actual reset value to be used with DoReset().
func ParseResetParams(bytes json.RawMessage) (string, error) {
var params ResetParams
if err := json.Unmarshal(bytes, &params); err != nil {
return "", errors.ErrInvalid.
WithMsgf("invalid reset params").
WithCausef(err.Error())
}

resetValue := strings.ToLower(params.To)
if resetValue != resetLatest && resetValue != resetEarliest {
return "", errors.ErrInvalid.
WithMsgf("reset_value must be one of %v", []string{resetEarliest, resetLatest})
}

return resetValue, nil
}

func prepCommand(brokers, consumerID, kafkaResetValue string) []string {
args := []string{
"kafka-consumer-groups.sh",
Expand Down

0 comments on commit 2c4ef1f

Please sign in to comment.