Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matching simulation improvements #6224

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"context"
"flag"
"fmt"
"os"
"reflect"
"strings"
"sync"
Expand All @@ -63,6 +64,7 @@ type operation string

const (
operationPollForDecisionTask operation = "PollForDecisionTask"
defaultTestCase = "testdata/matching_simulation_default.yaml"
)

type operationStats struct {
Expand All @@ -82,7 +84,10 @@ type operationAggStats struct {
func TestMatchingSimulationSuite(t *testing.T) {
flag.Parse()

confPath := "testdata/matching_simulation.yaml"
confPath := os.Getenv("MATCHING_SIMULATION_CASE")
if confPath == "" {
confPath = defaultTestCase
}
clusterConfig, err := GetTestClusterConfig(confPath)
if err != nil {
t.Fatalf("failed creating cluster config from %s, err: %v", confPath, err)
Expand Down Expand Up @@ -176,31 +181,37 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
numPollers := getNumPollers(s.testClusterConfig.MatchingConfig.SimulationConfig.NumPollers)
pollDuration := getPollDuration(s.testClusterConfig.MatchingConfig.SimulationConfig.PollTimeout)
polledTasksCounter := int32(0)
maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate)
var tasksToReceive sync.WaitGroup
tasksToReceive.Add(maxTasksToGenerate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simulation runs for 180s at most (timeout set here) and maxTasksToGenerate might not be hit if it's set too high. please add a comment in the test code here about this to help identify "simulation is stuck" cases due to misconfiguration

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks.

var pollerWG sync.WaitGroup
for i := 0; i < numPollers; i++ {
pollerWG.Add(1)
go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh)
go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh, &tasksToReceive)
}

// wait a bit for pollers to start.
time.Sleep(300 * time.Millisecond)

startTime := time.Now()
// Start task generators
generatedTasksCounter := int32(0)
lastTaskScheduleID := int32(0)
numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators)
taskGenerateInterval := getTaskGenerateInterval(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskGeneratorTickInterval)
maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate)
var generatorWG sync.WaitGroup
for i := 1; i <= numGenerators; i++ {
generatorWG.Add(1)
go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, taskGenerateInterval, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG)
}

// Let it run for a while
sleepDuration := 60 * time.Second
s.log("Wait %v for simulation to run", sleepDuration)
time.Sleep(sleepDuration)
// Let it run until all tasks have been polled.
// There's a test timeout configured in docker/buildkite/docker-compose-local-matching-simulation.yml that you
// can change if your test case needs more time
s.log("Waiting until all tasks are received")
tasksToReceive.Wait()
executionTime := time.Now().Sub(startTime)
s.log("Completed benchmark in %v", (time.Now().Sub(startTime)))
s.log("Canceling context to stop pollers and task generators")
cancel()
pollerWG.Wait()
Expand All @@ -216,7 +227,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
// Don't change the start/end line format as it is used by scripts to parse the summary info
testSummary := []string{}
testSummary = append(testSummary, "Simulation Summary:")
testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", sleepDuration))
testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", executionTime))
testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers))
testSummary = append(testSummary, fmt.Sprintf("Poll Timeout: %v", pollDuration))
testSummary = append(testSummary, fmt.Sprintf("Num of Task Generators: %d", numGenerators))
Expand Down Expand Up @@ -277,7 +288,7 @@ func (s *MatchingSimulationSuite) generate(
return
}
decisionTask := newDecisionTask(domainID, tasklist, scheduleID)
reqCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
err := matchingClient.AddDecisionTask(reqCtx, decisionTask)
cancel()
if err != nil {
Expand All @@ -299,6 +310,7 @@ func (s *MatchingSimulationSuite) poll(
wg *sync.WaitGroup,
pollDuration time.Duration,
statsCh chan *operationStats,
tasksToReceive *sync.WaitGroup,
) {
defer wg.Done()
t := time.NewTicker(50 * time.Millisecond)
Expand Down Expand Up @@ -344,6 +356,7 @@ func (s *MatchingSimulationSuite) poll(

atomic.AddInt32(polledTasksCounter, 1)
s.log("PollForDecisionTask got a task with startedid: %d. resp: %+v", resp.StartedEventID, resp)
tasksToReceive.Done()
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions host/testdata/matching_simulation_burst.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 2
tasklistreadpartitions: 2
numpollers: 10
numtaskgenerators: 2
taskgeneratortickinterval: 10ms
maxtasktogenerate: 1500
polltimeout: 5s
forwardermaxoutstandingpolls: 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default setting in prod is 1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just adding this as an example for now, I'll be adding a full suite that we can use to validate it.

forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
workerconfig:
enableasyncwfconsumer: false
3 changes: 2 additions & 1 deletion scripts/run_matching_simulator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ resultFolder="matching-simulator-output"
mkdir -p "$resultFolder"
eventLogsFile="$resultFolder/events.json"
testSummaryFile="$resultFolder/$testName-summary.txt"
testCase="testdata/matching_simulation_${1:-default}.yaml"


echo "Building test image"
Expand All @@ -19,7 +20,7 @@ docker-compose -f docker/buildkite/docker-compose-local-matching-simulation.yml
echo "Running the test"
docker-compose \
-f docker/buildkite/docker-compose-local-matching-simulation.yml \
run --rm matching-simulator \
run -e MATCHING_SIMULATION_CASE=$testCase --rm matching-simulator \
| grep -a --line-buffered "Matching New Event" \
| sed "s/Matching New Event: //" \
| jq . > "$eventLogsFile"
Expand Down
Loading