diff --git a/modules/firehose/config.go b/modules/firehose/config.go index ad73908d..10b2e0c4 100644 --- a/modules/firehose/config.go +++ b/modules/firehose/config.go @@ -108,8 +108,10 @@ func readConfig(r resource.Resource, confJSON json.RawMessage, dc driverConf) (* return nil, errors.ErrInvalid.WithMsgf("deployment_id must not have more than 53 chars") } + // we name a consumer group by adding a sequence suffix to the deployment name + // this sequence will later be incremented to name new consumer group while resetting offset if consumerID := cfg.EnvVariables[confKeyConsumerID]; consumerID == "" { - cfg.EnvVariables[confKeyConsumerID] = fmt.Sprintf("%s-0001", cfg.DeploymentID) + cfg.EnvVariables[confKeyConsumerID] = fmt.Sprintf("%s-1", cfg.DeploymentID) } rl := dc.RequestsAndLimits[defaultKey] diff --git a/modules/firehose/driver_plan.go b/modules/firehose/driver_plan.go index 3b0b5338..5f49975e 100644 --- a/modules/firehose/driver_plan.go +++ b/modules/firehose/driver_plan.go @@ -3,6 +3,9 @@ package firehose import ( "context" "encoding/json" + "fmt" + "regexp" + "strconv" "github.com/goto/entropy/core/module" "github.com/goto/entropy/core/resource" @@ -10,6 +13,11 @@ import ( "github.com/goto/entropy/pkg/kafka" ) +const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" + +var suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`) +var errGroupIDFormat = fmt.Errorf("group id must match the format '%s'", suffixRegex) + func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { switch act.Name { case module.CreateAction: @@ -18,6 +26,9 @@ func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, a case ResetAction: return fd.planReset(exr, act) + case ResetV2Action: + return fd.planResetV2(exr, act) + default: return fd.planChange(exr, act) } @@ -128,8 +139,8 @@ func (fd *firehoseDriver) planCreate(exr module.ExpandedResource, act module.Act return &exr.Resource, nil } -func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { - resetValue, err := kafka.ParseResetParams(act.Params) +func (fd *firehoseDriver) planResetV2(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + resetValue, err := kafka.ParseResetV2Params(act.Params) if err != nil { return nil, err } @@ -159,3 +170,55 @@ func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.Acti } return &exr.Resource, nil } + +func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { + resetValue, err := kafka.ParseResetParams(act.Params) + if err != nil { + return nil, err + } + + immediately := fd.timeNow() + + curConf, err := readConfig(exr.Resource, exr.Resource.Spec.Configs, fd.conf) + if err != nil { + return nil, err + } + + curConf.ResetOffset = resetValue + curConf.EnvVariables[SourceKafkaConsumerAutoOffsetReset] = resetValue + curConf.EnvVariables[confKeyConsumerID], err = getNewConsumerGroupID(curConf.EnvVariables[confKeyConsumerID]) + if err != nil { + return nil, err + } + + exr.Resource.Spec.Configs = mustJSON(curConf) + exr.Resource.State = resource.State{ + Status: resource.StatusPending, + Output: exr.Resource.State.Output, + NextSyncAt: &immediately, + ModuleData: mustJSON(transientData{ + PendingSteps: []string{ + stepReleaseStop, // stop the firehose + stepReleaseUpdate, // restart the deployment. + }, + }), + } + return &exr.Resource, nil +} + +func getNewConsumerGroupID(curGroup string) (string, error) { + matches := suffixRegex.FindStringSubmatch(curGroup) + if expLen := 3; len(matches) != expLen { + return "", errGroupIDFormat + } + prefix, sequence := matches[1], matches[2] + + seq, err := strconv.Atoi(sequence) + if err != nil { + return "", errors.Errorf("error converting group sequence %s to int: %v", sequence, err) + } else { + seq++ + } + + return fmt.Sprintf("%s-%d", prefix, seq), nil +} diff --git a/modules/firehose/driver_plan_test.go b/modules/firehose/driver_plan_test.go index c984979f..e4f81ed6 100644 --- a/modules/firehose/driver_plan_test.go +++ b/modules/firehose/driver_plan_test.go @@ -93,7 +93,7 @@ func TestFirehoseDriver_Plan(t *testing.T) { "env_variables": map[string]string{ "SINK_TYPE": "LOG", "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-3801d0-firehose-0001", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "ABCDEFGHIJKLMNOPQRSTUVWXYZ-abcdefghij-3801d0-firehose-1", "SOURCE_KAFKA_BROKERS": "localhost:9092", "SOURCE_KAFKA_TOPIC": "foo-log", }, @@ -306,7 +306,7 @@ func TestFirehoseDriver_Plan(t *testing.T) { act: module.ActionRequest{ Name: ResetAction, Params: mustJSON(map[string]any{ - "reset_to": "some_random", + "to": "some_random", }), }, wantErr: errors.ErrInvalid, @@ -324,11 +324,12 @@ func TestFirehoseDriver_Plan(t *testing.T) { "replicas": 1, "deployment_id": "firehose-deployment-x", "env_variables": map[string]string{ - "SINK_TYPE": "LOG", - "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", - "SOURCE_KAFKA_BROKERS": "localhost:9092", - "SOURCE_KAFKA_TOPIC": "foo-log", + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-deployment-x-1", + "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log", }, "limits": map[string]any{ "cpu": "200m", @@ -352,7 +353,7 @@ func TestFirehoseDriver_Plan(t *testing.T) { act: module.ActionRequest{ Name: ResetAction, Params: mustJSON(map[string]any{ - "to": "latest", + "to": "earliest", }), }, want: &resource.Resource{ @@ -365,13 +366,14 @@ func TestFirehoseDriver_Plan(t *testing.T) { "replicas": 1, "deployment_id": "firehose-deployment-x", "env_variables": map[string]string{ - "SINK_TYPE": "LOG", - "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", - "SOURCE_KAFKA_CONSUMER_GROUP_ID": "foo-bar-baz", - "SOURCE_KAFKA_BROKERS": "localhost:9092", - "SOURCE_KAFKA_TOPIC": "foo-log", + "SINK_TYPE": "LOG", + "INPUT_SCHEMA_PROTO_CLASS": "com.foo.Bar", + "SOURCE_KAFKA_CONSUMER_GROUP_ID": "firehose-deployment-x-2", + "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "earliest", + "SOURCE_KAFKA_BROKERS": "localhost:9092", + "SOURCE_KAFKA_TOPIC": "foo-log", }, - "reset_offset": "latest", + "reset_offset": "earliest", "limits": map[string]any{ "cpu": "200m", "memory": "512Mi", @@ -391,10 +393,8 @@ func TestFirehoseDriver_Plan(t *testing.T) { ReleaseName: "bar", }), ModuleData: mustJSON(transientData{ - ResetOffsetTo: "latest", PendingSteps: []string{ stepReleaseStop, - stepKafkaReset, stepReleaseUpdate, }, }), @@ -558,3 +558,43 @@ func TestFirehoseDriver_Plan(t *testing.T) { }) } } + +func TestGetNewConsumerGroupID(t *testing.T) { + t.Parallel() + + table := []struct { + title string + deploymentID string + consumerGroupID string + want string + wantErr error + }{ + { + title: "invalid-group-id", + consumerGroupID: "test-firehose-xyz", + want: "", + wantErr: errGroupIDFormat, + }, + { + title: "valid-group-id", + consumerGroupID: "test-firehose-0999", + want: "test-firehose-1000", + wantErr: nil, + }, + } + + for _, tt := range table { + t.Run(tt.title, func(t *testing.T) { + got, err := getNewConsumerGroupID(tt.consumerGroupID) + if tt.wantErr != nil { + require.Error(t, err) + assert.Equal(t, "", got) + assert.ErrorIs(t, err, tt.wantErr) + } else { + assert.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, tt.want, got) + } + }) + } +} diff --git a/modules/firehose/module.go b/modules/firehose/module.go index d1b12e6a..2b519cd5 100644 --- a/modules/firehose/module.go +++ b/modules/firehose/module.go @@ -25,6 +25,7 @@ const ( StartAction = "start" StopAction = "stop" ResetAction = "reset" + ResetV2Action = "reset-v2" UpgradeAction = "upgrade" ) diff --git a/pkg/kafka/consumer_reset.go b/pkg/kafka/consumer_reset.go index f2636354..a59e8e44 100644 --- a/pkg/kafka/consumer_reset.go +++ b/pkg/kafka/consumer_reset.go @@ -20,11 +20,15 @@ const ( resetDatetime = "datetime" ) -type ResetParams struct { +type ResetV2Params struct { To string `json:"to"` Datetime string `json:"datetime"` } +type ResetParams struct { + To string `json:"to"` +} + // DoReset executes a kubernetes job with kafka-consumer-group.sh installed to // reset offset policy for the given consumer id on all topics. func DoReset(ctx context.Context, jobCluster *kube.Client, kubeNamespace, kafkaBrokers, kafkaConsumerID, kafkaResetValue, resetJobName string) error { @@ -40,10 +44,10 @@ func DoReset(ctx context.Context, jobCluster *kube.Client, kubeNamespace, kafkaB ) } -// ParseResetParams parses the given JSON data as reset parameters value and +// ParseResetV2Params parses the given JSON data as reset parameters value and // returns the actual reset value to be used with DoReset(). -func ParseResetParams(bytes json.RawMessage) (string, error) { - var params ResetParams +func ParseResetV2Params(bytes json.RawMessage) (string, error) { + var params ResetV2Params if err := json.Unmarshal(bytes, ¶ms); err != nil { return "", errors.ErrInvalid. WithMsgf("invalid reset params"). @@ -61,6 +65,25 @@ func ParseResetParams(bytes json.RawMessage) (string, error) { return resetValue, nil } +// ParseResetParams parses the given JSON data as reset parameters value and +// returns the actual reset value to be used with DoReset(). +func ParseResetParams(bytes json.RawMessage) (string, error) { + var params ResetParams + if err := json.Unmarshal(bytes, ¶ms); err != nil { + return "", errors.ErrInvalid. + WithMsgf("invalid reset params"). + WithCausef(err.Error()) + } + + resetValue := strings.ToLower(params.To) + if resetValue != resetLatest && resetValue != resetEarliest { + return "", errors.ErrInvalid. + WithMsgf("reset_value must be one of %v", []string{resetEarliest, resetLatest}) + } + + return resetValue, nil +} + func prepCommand(brokers, consumerID, kafkaResetValue string) []string { args := []string{ "kafka-consumer-groups.sh",