Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: old reset strategy #72

Merged
merged 14 commits into from
Sep 20, 2023
4 changes: 3 additions & 1 deletion modules/firehose/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ func readConfig(r resource.Resource, confJSON json.RawMessage, dc driverConf) (*
return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars")
}

// we name a consumer group by adding a sequence suffix to the deployment name
// this sequence will later be incremented to name new consumer group while resetting offset
if consumerID := cfg.EnvVariables[confKeyConsumerID]; consumerID == "" {
cfg.EnvVariables[confKeyConsumerID] = fmt.Sprintf("%s-0001", cfg.DeploymentID)
cfg.EnvVariables[confKeyConsumerID] = fmt.Sprintf("%s-1", cfg.DeploymentID)
mabdh marked this conversation as resolved.
Show resolved Hide resolved
}

rl := dc.RequestsAndLimits[defaultKey]
Expand Down
66 changes: 64 additions & 2 deletions modules/firehose/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ package firehose
import (
"context"
"encoding/json"
"fmt"
"regexp"
"strconv"

"github.com/goto/entropy/core/module"
"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/pkg/errors"
"github.com/goto/entropy/pkg/kafka"
)

const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"

var suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`)

func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
switch act.Name {
case module.CreateAction:
Expand All @@ -18,6 +25,9 @@ func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, a
case ResetAction:
return fd.planReset(exr, act)

case ResetV2Action:
return fd.planResetV2(exr, act)

default:
return fd.planChange(exr, act)
}
Expand Down Expand Up @@ -128,8 +138,8 @@ func (fd *firehoseDriver) planCreate(exr module.ExpandedResource, act module.Act
return &exr.Resource, nil
}

func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
resetValue, err := kafka.ParseResetParams(act.Params)
func (fd *firehoseDriver) planResetV2(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
resetValue, err := kafka.ParseResetV2Params(act.Params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,3 +169,55 @@ func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.Acti
}
return &exr.Resource, nil
}

func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
resetValue, err := kafka.ParseResetParams(act.Params)
if err != nil {
return nil, err
}

immediately := fd.timeNow()

curConf, err := readConfig(exr.Resource, exr.Resource.Spec.Configs, fd.conf)
if err != nil {
return nil, err
}

curConf.ResetOffset = resetValue
curConf.EnvVariables[SourceKafkaConsumerAutoOffsetReset] = resetValue
curConf.EnvVariables[confKeyConsumerID], err = getNewConsumerGroupID(curConf.EnvVariables[confKeyConsumerID])
if err != nil {
return nil, err
}

exr.Resource.Spec.Configs = mustJSON(curConf)
exr.Resource.State = resource.State{
Status: resource.StatusPending,
Output: exr.Resource.State.Output,
NextSyncAt: &immediately,
ModuleData: mustJSON(transientData{
PendingSteps: []string{
stepReleaseStop, // stop the firehose
stepReleaseUpdate, // restart the deployment.
},
}),
}
return &exr.Resource, nil
}

func getNewConsumerGroupID(curGroup string) (string, error) {
matches := suffixRegex.FindStringSubmatch(curGroup)
if expLen := 3; len(matches) != expLen {
return "", errors.New("group id doest not match regex")
ishanarya0 marked this conversation as resolved.
Show resolved Hide resolved
}
prefix, sequence := matches[1], matches[2]

seq, err := strconv.Atoi(sequence)
if err != nil {
return "", errors.Errorf("error converting group sequence %s to int: %v", sequence, err)
} else {
seq++
}

return fmt.Sprintf("%s-%d", prefix, seq), nil
}
72 changes: 56 additions & 16 deletions modules/firehose/driver_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestFirehoseDriver_Plan(t *testing.T) {
"env_variables": map[string]string{
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-3801d0-firehose-0001",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-3801d0-firehose-1",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
},
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestFirehoseDriver_Plan(t *testing.T) {
act: module.ActionRequest{
Name: ResetAction,
Params: mustJSON(map[string]any{
"reset_to": "some_random",
"to": "some_random",
}),
},
wantErr: errors.ErrInvalid,
Expand All @@ -324,11 +324,12 @@ func TestFirehoseDriver_Plan(t *testing.T) {
"replicas": 1,
"deployment_id": "firehose-deployment-x",
"env_variables": map[string]string{
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-deployment-x-1",
"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
},
"limits": map[string]any{
"cpu": "200m",
Expand All @@ -352,7 +353,7 @@ func TestFirehoseDriver_Plan(t *testing.T) {
act: module.ActionRequest{
Name: ResetAction,
Params: mustJSON(map[string]any{
"to": "latest",
"to": "earliest",
}),
},
want: &resource.Resource{
Expand All @@ -365,13 +366,14 @@ func TestFirehoseDriver_Plan(t *testing.T) {
"replicas": 1,
"deployment_id": "firehose-deployment-x",
"env_variables": map[string]string{
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
"SINK_TYPE": "LOG",
"INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar",
"SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-deployment-x-2",
"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "earliest",
"SOURCE_KAFKA_BROKERS": "localhost:9092",
"SOURCE_KAFKA_TOPIC": "foo-log",
},
"reset_offset": "latest",
"reset_offset": "earliest",
"limits": map[string]any{
"cpu": "200m",
"memory": "512Mi",
Expand All @@ -391,10 +393,8 @@ func TestFirehoseDriver_Plan(t *testing.T) {
ReleaseName: "bar",
}),
ModuleData: mustJSON(transientData{
ResetOffsetTo: "latest",
PendingSteps: []string{
stepReleaseStop,
stepKafkaReset,
stepReleaseUpdate,
},
}),
Expand Down Expand Up @@ -558,3 +558,43 @@ func TestFirehoseDriver_Plan(t *testing.T) {
})
}
}

func TestGetNewConsumerGroupID(t *testing.T) {
t.Parallel()

table := []struct {
title string
deploymentID string
consumerGroupID string
want string
wantErr string
}{
{
title: "invalid-group-id",
consumerGroupID: "test-firehose-xyz",
want: "",
wantErr: "group id doest not match regex",
},
{
title: "valid-group-id",
consumerGroupID: "test-firehose-0999",
want: "test-firehose-1000",
wantErr: "",
},
}

for _, tt := range table {
t.Run(tt.title, func(t *testing.T) {
got, err := getNewConsumerGroupID(tt.consumerGroupID)
if tt.wantErr != "" {
require.Error(t, err)
assert.Equal(t, "", got)
assert.Equal(t, err.Error(), tt.wantErr)
} else {
assert.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, tt.want, got)
}
})
}
}
1 change: 1 addition & 0 deletions modules/firehose/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
StartAction = "start"
StopAction = "stop"
ResetAction = "reset"
ResetV2Action = "reset-v2"
UpgradeAction = "upgrade"
)

Expand Down
31 changes: 27 additions & 4 deletions pkg/kafka/consumer_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ const (
resetDatetime = "datetime"
)

type ResetParams struct {
type ResetV2Params struct {
To string `json:"to"`
Datetime string `json:"datetime"`
}

type ResetParams struct {
To string `json:"to"`
}

// DoReset executes a kubernetes job with kafka-consumer-group.sh installed to
// reset offset policy for the given consumer id on all topics.
func DoReset(ctx context.Context, jobCluster *kube.Client, kubeNamespace, kafkaBrokers, kafkaConsumerID, kafkaResetValue, resetJobName string) error {
Expand All @@ -40,10 +44,10 @@ func DoReset(ctx context.Context, jobCluster *kube.Client, kubeNamespace, kafkaB
)
}

// ParseResetParams parses the given JSON data as reset parameters value and
// ParseResetV2Params parses the given JSON data as reset parameters value and
// returns the actual reset value to be used with DoReset().
func ParseResetParams(bytes json.RawMessage) (string, error) {
var params ResetParams
func ParseResetV2Params(bytes json.RawMessage) (string, error) {
var params ResetV2Params
if err := json.Unmarshal(bytes, &params); err != nil {
return "", errors.ErrInvalid.
WithMsgf("invalid reset params").
Expand All @@ -61,6 +65,25 @@ func ParseResetParams(bytes json.RawMessage) (string, error) {
return resetValue, nil
}

// ParseResetParams parses the given JSON data as reset parameters value and
// returns the actual reset value to be used with DoReset().
func ParseResetParams(bytes json.RawMessage) (string, error) {
var params ResetParams
if err := json.Unmarshal(bytes, &params); err != nil {
return "", errors.ErrInvalid.
WithMsgf("invalid reset params").
WithCausef(err.Error())
}

resetValue := strings.ToLower(params.To)
if resetValue != resetLatest && resetValue != resetEarliest {
return "", errors.ErrInvalid.
WithMsgf("reset_value must be one of %v", []string{resetEarliest, resetLatest})
}

return resetValue, nil
}

func prepCommand(brokers, consumerID, kafkaResetValue string) []string {
args := []string{
"kafka-consumer-groups.sh",
Expand Down
Loading