Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add Flags maxConcurrentTests (and version) to allow for concurrent test execution. #9

Merged
merged 6 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-reco
jq \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder bin/assertoor /assertoor
COPY --from=builder /src/bin/assertoor /assertoor
ENTRYPOINT ["/assertoor"]
31 changes: 25 additions & 6 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"

"github.com/ethpandaops/assertoor/pkg/coordinator"
"github.com/ethpandaops/assertoor/pkg/coordinator/buildinfo"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand All @@ -14,6 +15,21 @@ var rootCmd = &cobra.Command{
Use: "assertoor",
Short: "Runs a configured test until completion or error",
Run: func(cmd *cobra.Command, _ []string) {
if version && buildinfo.BuildRelease != "" {
log.Printf("Release: %s\n", buildinfo.BuildRelease)
return
}

if version && buildinfo.BuildVersion != "" {
log.Printf("Version: %s\n", buildinfo.BuildVersion)
return
}

if version {
log.Print("Local build; Unknown version\n")
return
}

config, err := coordinator.NewConfig(cfgFile)
if err != nil {
log.Fatal(err)
Expand All @@ -31,7 +47,7 @@ var rootCmd = &cobra.Command{
logr.SetLevel(logrus.DebugLevel)
}

coord := coordinator.NewCoordinator(config, logr, metricsPort)
coord := coordinator.NewCoordinator(config, logr, metricsPort, maxConcurrentTests)

if err := coord.Run(cmd.Context()); err != nil {
log.Fatal(err)
Expand All @@ -41,10 +57,12 @@ var rootCmd = &cobra.Command{
}

var (
cfgFile string
logFormat string
verbose bool
metricsPort int
cfgFile string
logFormat string
verbose bool
metricsPort int
maxConcurrentTests int
version bool
)

// Execute adds all child commands to the root command and sets flags appropriately.
Expand All @@ -62,6 +80,7 @@ func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file")
rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "text", "log format (default is text). Valid values are 'text', 'json'")
rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output")

rootCmd.Flags().IntVar(&maxConcurrentTests, "maxConcurrentTests", 1, "Number of tests to run concurrently")
rootCmd.Flags().IntVarP(&metricsPort, "metrics-port", "", 9090, "Port to serve Prometheus metrics on")
rootCmd.Flags().BoolVarP(&version, "version", "", false, "Print version information")
}
14 changes: 12 additions & 2 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ type Coordinator struct {
testHistory []types.Test
testRegistryMutex sync.RWMutex
testNotificationChan chan bool
maxConcurrentTests int
}

func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort int) *Coordinator {
func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, maxConcurrentTests int) *Coordinator {
return &Coordinator{
log: logger.NewLogger(&logger.ScopeOptions{
Parent: log,
Expand All @@ -58,6 +59,7 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort int) *Co
testQueue: []types.Test{},
testHistory: []types.Test{},
testNotificationChan: make(chan bool, 1),
maxConcurrentTests: maxConcurrentTests,
}
}

Expand Down Expand Up @@ -267,6 +269,8 @@ func (c *Coordinator) createTestRun(descriptor types.TestDescriptor, configOverr
}

func (c *Coordinator) runTestExecutionLoop(ctx context.Context) {
semaphore := make(chan bool, c.maxConcurrentTests)

for {
var nextTest types.Test

Expand All @@ -280,7 +284,13 @@ func (c *Coordinator) runTestExecutionLoop(ctx context.Context) {

if nextTest != nil {
// run next test
c.runTest(ctx, nextTest)
testFunc := func(nextTest types.Test) {
defer func() { <-semaphore }()
c.runTest(ctx, nextTest)
}
semaphore <- true

go testFunc(nextTest)
} else {
// sleep and wait for queue notification
select {
Expand Down
Loading