Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add retry counter on failed reconciliation #255

Merged
merged 16 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions core/application/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ package application
import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"

"github.com/weaveworks/flintlock/api/events"
"github.com/weaveworks/flintlock/core/models"
"github.com/weaveworks/flintlock/core/plans"
"github.com/weaveworks/flintlock/core/ports"
portsctx "github.com/weaveworks/flintlock/core/ports/context"
"github.com/weaveworks/flintlock/pkg/defaults"
"github.com/weaveworks/flintlock/pkg/log"
"github.com/weaveworks/flintlock/pkg/planner"
)

const backoffBaseInSeconds = 20

func (a *app) ReconcileMicroVM(ctx context.Context, id, namespace string) error {
logger := log.GetLogger(ctx).WithField("action", "reconcile")

Expand Down Expand Up @@ -52,52 +57,99 @@ func (a *app) ResyncMicroVMs(ctx context.Context, namespace string) error {
return nil
}

func (a *app) plan(spec *models.MicroVM, logger *logrus.Entry) (planner.Plan, error) {
func (a *app) plan(spec *models.MicroVM, logger *logrus.Entry) planner.Plan {
l := logger.WithField("stage", "plan")
l.Info("Generate plan")

if spec.Status.Retry > a.cfg.MaximumRetry {
return nil, reachedMaximumRetryError{vmid: spec.ID, retries: spec.Status.Retry}
}

// Delete only if the spec was marked as deleted.
if spec.Spec.DeletedAt != 0 {
input := &plans.DeletePlanInput{
StateDirectory: a.cfg.RootStateDir,
VM: spec,
}

return plans.MicroVMDeletePlan(input), nil
return plans.MicroVMDeletePlan(input)
}

input := &plans.CreateOrUpdatePlanInput{
StateDirectory: a.cfg.RootStateDir,
VM: spec,
}

return plans.MicroVMCreateOrUpdatePlan(input), nil
return plans.MicroVMCreateOrUpdatePlan(input)
}

func (a *app) reschedule(ctx context.Context, logger *logrus.Entry, spec *models.MicroVM) error {
spec.Status.Retry++
waitTime := time.Duration(spec.Status.Retry*backoffBaseInSeconds) * time.Second
spec.Status.NotBefore = time.Now().Add(waitTime).Unix()

logger.Infof(
"[%d/%d] reconciliation failed, reschedule at %s",
yitsushi marked this conversation as resolved.
Show resolved Hide resolved
spec.Status.Retry,
a.cfg.MaximumRetry,
time.Unix(spec.Status.NotBefore, 0),
)

if _, err := a.ports.Repo.Save(ctx, spec); err != nil {
return fmt.Errorf("saving spec after plan failed: %w", err)
}

go func(id, ns string, sleepTime time.Duration) {
logger.Info("Wait to emit update")
time.Sleep(sleepTime)
logger.Info("Emit update")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to be a bit more clear on these logs?

like "waiting to publish update event" or "waiting to reschedule for update"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we even need these lines, added them while i was working on it.


_ = a.ports.EventService.Publish(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not care about logging this err? could imagine that being an annoying one to solve if it failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, intentionally ignored it. This go routine is a steroid on the system, if it fails next resync will do the trick. But we can log. Hopefully ppl don't get confused reading the log because it happens "random" as it's not in the flow (async go routine).

context.Background(),
defaults.TopicMicroVMEvents,
&events.MicroVMSpecUpdated{
ID: id,
Namespace: ns,
},
)
}(spec.ID.Name(), spec.ID.Namespace(), waitTime)

return nil
}

func (a *app) reconcile(ctx context.Context, spec *models.MicroVM, logger *logrus.Entry) error {
l := logger.WithField("vmid", spec.ID.String())
l.Info("Starting reconciliation")
localLogger := logger.WithField("vmid", spec.ID.String())
localLogger.Info("Starting reconciliation")

plan, planErr := a.plan(spec, l)
if planErr != nil {
return planErr
if spec.Status.Retry > a.cfg.MaximumRetry {
spec.Status.State = models.FailedState

logger.Error(reachedMaximumRetryError{vmid: spec.ID, retries: spec.Status.Retry})

return nil
}

if spec.Status.NotBefore > 0 && time.Now().Before(time.Unix(spec.Status.NotBefore, 0)) {
return nil
}

plan := a.plan(spec, localLogger)

execCtx := portsctx.WithPorts(ctx, a.ports)

executionID, err := a.ports.IdentifierService.GenerateRandom()
if err != nil {
if scheduleErr := a.reschedule(ctx, localLogger, spec); scheduleErr != nil {
richardcase marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("saving spec after plan failed: %w", scheduleErr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more relevant error message? (ditto line 150)

}

return fmt.Errorf("generating plan execution id: %w", err)
}

actuator := planner.NewActuator()

stepCount, err := actuator.Execute(execCtx, plan, executionID)
if err != nil {
if scheduleErr := a.reschedule(ctx, localLogger, spec); scheduleErr != nil {
return fmt.Errorf("saving spec after plan failed: %w", scheduleErr)
}

return fmt.Errorf("executing plan: %w", err)
}

Expand All @@ -109,6 +161,9 @@ func (a *app) reconcile(ctx context.Context, spec *models.MicroVM, logger *logru
return nil
}

spec.Status.Retry = 0
spec.Status.NotBefore = 0

if _, err := a.ports.Repo.Save(ctx, spec); err != nil {
return fmt.Errorf("saving spec after plan execution: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions core/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
ErrUnsupportedIfaceType = errors.New("unsupported network interface type")
ErrIfaceNotFound = errors.New("network interface not found")
ErrMissingStatusInfo = errors.New("status is not defined")
ErrUnableToBoot = errors.New("microvm is unable to boot")
)

// TopicNotFoundError is an error created when a topic with a specific name isn't found.
Expand Down
2 changes: 2 additions & 0 deletions core/models/microvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type MicroVMStatus struct {
NetworkInterfaces NetworkInterfaceStatuses `json:"network_interfaces"`
// Retry is a counter about how many times we retried to reconcile.
Retry int `json:"retry"`
// DeletedAt indicates the time the microvm was marked as deleted.
yitsushi marked this conversation as resolved.
Show resolved Hide resolved
NotBefore int64 `json:"not_before" validate:"omitempty"`
}

// Kernel is the specification of the kernel and its arguments.
Expand Down
2 changes: 2 additions & 0 deletions core/plans/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ package plans
const (
MicroVMDeletePlanName = "microvm_delete"
MicroVMCreateOrUpdatePlanName = "microvm_create_update"

microVMBootTime = 5
)
2 changes: 1 addition & 1 deletion core/plans/microvm_create_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *microvmCreateOrUpdatePlan) Create(ctx context.Context) ([]planner.Proce
}

// MicroVM provider start
if err := p.addStep(ctx, microvm.NewStartStep(p.vm, ports.Provider)); err != nil {
if err := p.addStep(ctx, microvm.NewStartStep(p.vm, ports.Provider, microVMBootTime)); err != nil {
return nil, fmt.Errorf("adding microvm start step: %w", err)
}

Expand Down
4 changes: 4 additions & 0 deletions core/steps/event/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ func (s *eventPublish) Do(ctx context.Context) ([]planner.Procedure, error) {

return nil, nil
}

func (s *eventPublish) Verify(ctx context.Context) error {
return nil
}
4 changes: 4 additions & 0 deletions core/steps/event/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ func TestNewPublish(t *testing.T) {
// now than crying later if the system does not do what we want.
shouldDo, _ := step.ShouldDo(ctx)
subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeTrue())
g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewPublish_eventServiceFailure(t *testing.T) {
Expand All @@ -67,7 +69,9 @@ func TestNewPublish_eventServiceFailure(t *testing.T) {
step := event.NewPublish(testTopic, evt, eventService)

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
4 changes: 4 additions & 0 deletions core/steps/microvm/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ func (s *createStep) Do(ctx context.Context) ([]planner.Procedure, error) {

return nil, nil
}

func (s *createStep) Verify(ctx context.Context) error {
return nil
}
10 changes: 10 additions & 0 deletions core/steps/microvm/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ func TestNewCreateStep(t *testing.T) {

shouldDo, shouldErr := step.ShouldDo(ctx)
subSteps, doErr := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeTrue())
g.Expect(shouldErr).To(g.BeNil())
g.Expect(subSteps).To(g.BeEmpty())
g.Expect(doErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewCreateStep_StateCheck(t *testing.T) {
Expand Down Expand Up @@ -96,9 +98,11 @@ func TestNewCreateStep_StateCheck(t *testing.T) {
Return(testCase.State, nil)

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.Equal(testCase.ExpectToRun))
g.Expect(shouldErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
}

Expand All @@ -119,9 +123,11 @@ func TestNewCreateStep_StateCheckError(t *testing.T) {
Return(ports.MicroVMStateUnknown, errors.New("i have no idea"))

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeFalse())
g.Expect(shouldErr).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewCreateStep_VMIsNotDefined(t *testing.T) {
Expand All @@ -137,9 +143,11 @@ func TestNewCreateStep_VMIsNotDefined(t *testing.T) {
step := microvm.NewCreateStep(vm, microVMService)

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).To(g.MatchError(internalerr.ErrSpecRequired))
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewCreateStep_ServiceCreateError(t *testing.T) {
Expand All @@ -159,7 +167,9 @@ func TestNewCreateStep_ServiceCreateError(t *testing.T) {
Return(errors.New("ensuring state dir: ...."))

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
4 changes: 4 additions & 0 deletions core/steps/microvm/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ func (s *deleteStep) Do(ctx context.Context) ([]planner.Procedure, error) {

return nil, nil
}

func (s *deleteStep) Verify(ctx context.Context) error {
return nil
}
10 changes: 10 additions & 0 deletions core/steps/microvm/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ func TestNewDeleteStep(t *testing.T) {

shouldDo, shouldErr := step.ShouldDo(ctx)
subSteps, doErr := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeTrue())
g.Expect(shouldErr).To(g.BeNil())
g.Expect(subSteps).To(g.BeEmpty())
g.Expect(doErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewDeleteStep_StateCheck(t *testing.T) {
Expand Down Expand Up @@ -97,9 +99,11 @@ func TestNewDeleteStep_StateCheck(t *testing.T) {
Return(testCase.State, nil)

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.Equal(testCase.ExpectToRun))
g.Expect(shouldErr).To(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
}

Expand All @@ -120,9 +124,11 @@ func TestNewDeleteStep_StateCheckError(t *testing.T) {
Return(ports.MicroVMStateUnknown, errors.New("i have no idea"))

shouldDo, shouldErr := step.ShouldDo(ctx)
verifyErr := step.Verify(ctx)

g.Expect(shouldDo).To(g.BeFalse())
g.Expect(shouldErr).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewDeleteStep_VMIsNotDefined(t *testing.T) {
Expand All @@ -138,9 +144,11 @@ func TestNewDeleteStep_VMIsNotDefined(t *testing.T) {
step := microvm.NewDeleteStep(vm, microVMService)

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).To(g.MatchError(internalerr.ErrSpecRequired))
g.Expect(verifyErr).To(g.BeNil())
}

func TestNewDeleteStep_ServiceDeleteError(t *testing.T) {
Expand All @@ -160,7 +168,9 @@ func TestNewDeleteStep_ServiceDeleteError(t *testing.T) {
Return(errors.New("ensuring state dir: ...."))

subSteps, err := step.Do(ctx)
verifyErr := step.Verify(ctx)

g.Expect(subSteps).To(g.BeEmpty())
g.Expect(err).ToNot(g.BeNil())
g.Expect(verifyErr).To(g.BeNil())
}
Loading