diff --git a/src/aggregator/integration/integration.go b/src/aggregator/integration/integration.go index 94c56d3d67..fcf23f84fe 100644 --- a/src/aggregator/integration/integration.go +++ b/src/aggregator/integration/integration.go @@ -32,7 +32,7 @@ func waitUntil(fn conditionFn, timeout time.Duration) bool { if fn() { return true } - time.Sleep(time.Second) + time.Sleep(10 * time.Millisecond) } return false } diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index 1df66c7fbd..1f5d254a01 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -21,13 +21,14 @@ package integration import ( + "encoding/json" "errors" "fmt" + "io/ioutil" "net/http" "sort" "sync" "testing" - "time" "github.com/m3db/m3/src/aggregator/aggregator" "github.com/m3db/m3/src/aggregator/aggregator/handler" @@ -246,14 +247,41 @@ func (ts *testServerSetup) newClient() *client { return newClient(ts.rawTCPAddr, ts.opts.ClientBatchSize(), connectTimeout) } +func (ts *testServerSetup) getStatusResponse(path string, response interface{}) error { + resp, err := http.Get("http://" + ts.httpAddr + path) //nolint + if err != nil { + return err + } + + defer resp.Body.Close() //nolint:errcheck + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("got a non-200 status code: %v", resp.StatusCode) + } + return json.Unmarshal(b, response) +} + func (ts *testServerSetup) waitUntilServerIsUp() error { - c := ts.newClient() - defer c.close() + isUp := func() bool { + var resp httpserver.Response + if err := ts.getStatusResponse(httpserver.HealthPath, &resp); err != nil { + return false + } - serverIsUp := func() bool { return c.testConnection() } - if waitUntil(serverIsUp, ts.opts.ServerStateChangeTimeout()) { + if resp.State == "OK" { + return true + } + + return false + } + + if waitUntil(isUp, ts.opts.ServerStateChangeTimeout()) { return nil } + return errServerStartTimedOut } @@ -302,20 +330,22 @@ func (ts *testServerSetup) startServer() error { func (ts *testServerSetup) waitUntilLeader() error { isLeader := func() bool { - leader, err := ts.leaderService.Leader(ts.electionKey) - if err != nil { + var resp httpserver.StatusResponse + if err := ts.getStatusResponse(httpserver.StatusPath, &resp); err != nil { return false } - return leader == ts.leaderValue + + if resp.Status.FlushStatus.ElectionState == aggregator.LeaderState { + return true + } + return false } - if !waitUntil(isLeader, ts.opts.ElectionStateChangeTimeout()) { - return errLeaderElectionTimeout + + if waitUntil(isLeader, ts.opts.ElectionStateChangeTimeout()) { + return nil } - // TODO(xichen): replace the sleep here by using HTTP client to explicit - // curl the server for election status. - // Give the server some time to transition into leader state if needed. - time.Sleep(time.Second) - return nil + + return errLeaderElectionTimeout } func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy {