From c3a7f1fea3a6b9c9533fd6a0e129568f189c8eb4 Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Wed, 21 Jun 2023 20:54:25 +0200 Subject: [PATCH 1/2] fix(cmd): stop generators as last to avoid deadlock on exit As jobs unconditionally read from generator's channel there can be a situation when generator is closed and will not produce any new values while some jobs still wait on empty channel deadlocking and preventing process exit. --- cmd/gemini/root.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 768b2f78..8f70e979 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -259,7 +259,8 @@ func run(_ *cobra.Command, _ []string) error { stop.StartOsSignalsTransmitter(logger, &warmupStopFlag, &workStopFlag) pump := jobs.NewPump(ctx, logger) - generators := createGenerators(ctx, schema, schemaConfig, distFunc, concurrency, partitionCount, logger) + genCtx, genCancel := context.WithCancel(context.Background()) + generators := createGenerators(genCtx, schema, schemaConfig, distFunc, concurrency, partitionCount, logger) if !nonInteractive { sp := createSpinner(interactive()) @@ -294,8 +295,13 @@ func run(_ *cobra.Command, _ []string) error { if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, generators, globalStatus, logger, seed, &workStopFlag, failFast, verbose); err != nil { logger.Debug("error detected", zap.Error(err)) } - } + + // stop generators, this needs to be done after jobs finish + // because otherwise they can deadlock in pick() func waiting + // for more values to be generated + genCancel() + logger.Info("test finished") globalStatus.PrintResult(outFile, schema, version) if globalStatus.HasErrors() { From f862f7cb887776feeb2ea7b4b188e6fe6210b0fc Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Wed, 21 Jun 2023 20:57:39 +0200 Subject: [PATCH 2/2] fix(pkg): match goroutines start/stop messages log level --- pkg/generators/generator.go | 2 +- pkg/jobs/jobs.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index e062689d..b3640f60 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -162,7 +162,7 @@ func (g *Generator) start() { g.fillAllPartitions() select { case <-gCtx.Done(): - g.logger.Debug("stopping partition key generation loop", + g.logger.Info("stopping partition key generation loop", zap.Uint64("keys_created", g.cntCreated), zap.Uint64("keys_emitted", g.cntEmitted)) return gCtx.Err() diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index c065b5cf..a394b11e 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -177,7 +177,7 @@ func mutationJob( } select { case <-ctx.Done(): - logger.Debug("mutation job terminated") + logger.Info("mutation job terminated") return nil case hb := <-pump: time.Sleep(hb) @@ -286,7 +286,7 @@ func warmupJob( } select { case <-ctx.Done(): - logger.Debug("warmup job terminated") + logger.Info("warmup job terminated") return nil default: }