Skip to content

Commit

Permalink
working tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Aug 20, 2024
1 parent fbb82fb commit 5bd8a22
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/foundation/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
NodeIDField = "node_id"
ParallelWorkerIDField = "parallel_worker_id"
PipelineIDField = "pipeline_id"
PipelineStatusField = "pipeline_status"
ProcessorIDField = "processor_id"
RecordPositionField = "record_position"
RequestIDField = "request_id"
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import "github.com/conduitio/conduit/pkg/foundation/cerrors"
var (
ErrTimeout = cerrors.New("operation timed out")
ErrGracefulShutdown = cerrors.New("graceful shutdown")
ErrForceStop = cerrors.New("force stop")
ErrPipelineRunning = cerrors.New("pipeline is running")
ErrPipelineNotRunning = cerrors.New("pipeline not running")
ErrInstanceNotFound = cerrors.New("pipeline instance not found")
Expand Down
16 changes: 11 additions & 5 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error
return err
}

if pl.Status != StatusRunning {
if pl.Status != StatusRunning && pl.Status != StatusRecovering {
return cerrors.Errorf("can't stop pipeline with status %q: %w", pl.Status, ErrPipelineNotRunning)
}

Expand All @@ -138,7 +138,10 @@ func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error
}

func (s *Service) stopGraceful(ctx context.Context, pl *Instance, reason error) error {
s.logger.Info(ctx).Str(log.PipelineIDField, pl.ID).Msg("gracefully stopping pipeline")
s.logger.Info(ctx).
Str(log.PipelineIDField, pl.ID).
Any(log.PipelineStatusField, pl.Status).
Msg("gracefully stopping pipeline")
var errs []error
for _, n := range pl.n {
if node, ok := n.(stream.StoppableNode); ok {
Expand All @@ -155,8 +158,11 @@ func (s *Service) stopGraceful(ctx context.Context, pl *Instance, reason error)
}

func (s *Service) stopForceful(ctx context.Context, pl *Instance) error {
s.logger.Info(ctx).Str(log.PipelineIDField, pl.ID).Msg("force stopping pipeline")
pl.t.Kill(cerrors.New("force stop"))
s.logger.Info(ctx).
Str(log.PipelineIDField, pl.ID).
Any(log.PipelineStatusField, pl.Status).
Msg("force stopping pipeline")
pl.t.Kill(ErrForceStop)
for _, n := range pl.n {
if node, ok := n.(stream.ForceStoppableNode); ok {
// stop all pub nodes
Expand All @@ -171,7 +177,7 @@ func (s *Service) stopForceful(ctx context.Context, pl *Instance) error {
// (i.e. that existing messages get processed but not new messages get produced).
func (s *Service) StopAll(ctx context.Context, reason error) {
for _, pl := range s.instances {
if pl.Status != StatusRunning {
if pl.Status != StatusRunning && pl.Status != StatusRecovering {
continue
}
err := s.stopGraceful(ctx, pl, reason)
Expand Down
122 changes: 122 additions & 0 deletions pkg/pipeline/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,128 @@ func TestServiceLifecycle_PipelineError(t *testing.T) {
is.True(cerrors.Is(event.Error, wantErr))
}

func TestServiceLifecycle_StopAll_Recovering(t *testing.T) {
type testCase struct {
name string
stopFn func(ctx context.Context, is *is.I, pipelineService *Service, pipelineID string)
// whether we expect the source plugin's Stop() function to be called
// (doesn't happen when force-stopping)
wantSourceStop bool
want Status
wantErr error
}

runTest := func(t *testing.T, tc testCase) {
is := is.New(t)
ctx, killAll := context.WithCancel(context.Background())
defer killAll()
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)

ps := NewService(logger, db)

// create a host pipeline
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
is.NoErr(err)

// create mocked connectors
// source will stop and return ErrGracefulShutdown which should signal to the
// service that everything went well and the pipeline was gracefully shutdown
ctrl := gomock.NewController(t)
wantRecords := generateRecords(0)
source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, tc.wantSourceStop)
destination, destDispenser := asserterDestination(ctrl, persister, wantRecords)
dlq, dlqDispenser := asserterDestination(ctrl, persister, nil)
pl.DLQ.Plugin = dlq.Plugin

pl, err = ps.AddConnector(ctx, pl.ID, source.ID)
is.NoErr(err)
pl, err = ps.AddConnector(ctx, pl.ID, destination.ID)
is.NoErr(err)

// start the pipeline now that everything is set up
err = ps.Start(
ctx,
testConnectorFetcher{
source.ID: source,
destination.ID: destination,
testDLQID: dlq,
},
testProcessorFetcher{},
testPluginFetcher{
source.Plugin: sourceDispenser,
destination.Plugin: destDispenser,
dlq.Plugin: dlqDispenser,
},
pl.ID,
)
is.NoErr(err)

// wait for pipeline to finish consuming records from the source
time.Sleep(100 * time.Millisecond)

pl.Status = StatusRecovering
tc.stopFn(ctx, is, ps, pl.ID)

// wait for pipeline to finish
err = pl.Wait()
if tc.wantErr != nil {
is.True(err != nil)
} else {
is.NoErr(err)
is.Equal("", pl.Error)
}

is.Equal(tc.want, pl.Status)
}

testCases := []testCase{
{
name: "system stop (graceful shutdown err)",
stopFn: func(ctx context.Context, is *is.I, ps *Service, pipelineID string) {
ps.StopAll(ctx, ErrGracefulShutdown)
},
wantSourceStop: true,
want: StatusSystemStopped,
},
{
name: "system stop (terrible err)",
stopFn: func(ctx context.Context, is *is.I, ps *Service, pipelineID string) {
ps.StopAll(ctx, cerrors.New("terrible err"))
},
wantSourceStop: true,
want: StatusDegraded,
wantErr: cerrors.New("terrible err"),
},
{
name: "user stop (graceful)",
stopFn: func(ctx context.Context, is *is.I, ps *Service, pipelineID string) {
err := ps.Stop(ctx, pipelineID, false)
is.NoErr(err)
},
wantSourceStop: true,
want: StatusUserStopped,
},
{
name: "user stop (force)",
stopFn: func(ctx context.Context, is *is.I, ps *Service, pipelineID string) {
err := ps.Stop(ctx, pipelineID, true)
is.NoErr(err)
},
wantSourceStop: false,
want: StatusDegraded,
wantErr: ErrForceStop,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
runTest(t, tc)
})
}
}

func TestServiceLifecycle_PipelineStop(t *testing.T) {
is := is.New(t)
ctx, killAll := context.WithCancel(context.Background())
Expand Down

0 comments on commit 5bd8a22

Please sign in to comment.