Skip to content

Commit

Permalink
WIP > XPack, Update Core and X-Pack test setup
Browse files Browse the repository at this point in the history
* Do not delete X-Pack templates
* Use context timeouts
* Check pending tasks

// ----------------------------------------------------------------------------------------------------
// https://github.com/elastic/elasticsearch/blob/master/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

ensureNoInitializingShards();
wipeCluster();
waitForClusterStateUpdatesToFinish();
logIfThereAreRunningTasks();

wipeCluster() {
  if (hasXPack) {
      wipeRollupJobs();
      waitForPendingRollupTasks();
  }
  wipeSnapshots();
  adminClient().performRequest(new Request("DELETE", "*"));
  if (hasXPack) {
    Request request = new Request("GET", "_cat/templates");
    request.addParameter("h", "name");
    if (isXPackTemplate(template)) continue;
    adminClient().performRequest(new Request("DELETE", "_template/" + template));
  } else {
      adminClient().performRequest(new Request("DELETE", "_template/*"));
  }
  wipeClusterSettings();
  if (hasXPack) {
      deleteAllPolicies();
  }

wipeRollupJobs() {
  Response response = adminClient().performRequest(new Request("GET", "/_rollup/job/_all"));
  for (Map<String, Object> jobConfig : jobConfigs) {
    String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
    Request request = new Request("POST", "/_rollup/job/" + jobId + "/_stop");
    request.addParameter("ignore", "404");
    request.addParameter("wait_for_completion", "true");
    request.addParameter("timeout", "10s");
  }
  for (Map<String, Object> jobConfig : jobConfigs) {
    Request request = new Request("DELETE", "/_rollup/job/" + jobId);
    request.addParameter("ignore", "404");
  }

waitForPendingRollupTasks() {
  waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false);

ensureNoInitializingShards() {
  Request request = new Request("GET", "/_cluster/health");
  request.addParameter("wait_for_no_initializing_shards", "true");
  request.addParameter("timeout", "70s");
  request.addParameter("level", "shards");
  adminClient().performRequest(request);

waitForClusterStateUpdatesToFinish() {
  assertBusy(() -> {
    Response response = adminClient().performRequest(new Request("GET", "/_cluster/pending_tasks"));
    List<?> tasks = (List<?>) entityAsMap(response).get("tasks");
    if (false == tasks.isEmpty()) {
      fail(message.toString());
  }, 30, TimeUnit.SECONDS);

// curl -s -k -X POST 'https://elastic:elastic@localhost:9200/_all/_ilm/remove'
deleteAllPolicies() {
  Response response = adminClient().performRequest(new Request("GET", "/_ilm/policy"));
  for (String policyName : policies.keySet()) {
      adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName));
  }

// elastic/elasticsearch#31642
//
// > At the end of every ESRestTestCase we clean the cluster which includes
// > deleting all of the templates. If xpack is installed it'll automatically
// > recreate a few templates every time they are removed. Which is slow.
//
isXPackTemplate(String name) {
  if (name.startsWith(".monitoring-")) {
      return true;
  }
  if (name.startsWith(".watch") || name.startsWith(".triggered_watches")) {
      return true;
  }
  if (name.startsWith(".ml-")) {
      return true;
  }
  switch (name) {
  case ".triggered_watches":
  case ".watches":
  case "logstash-index-template":
  case "security_audit_log":
      return true;
  default:
      return false;
  }

// ----------------------------------------------------------------------------------------------------
// https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java

setupForTests() {
  waitForTemplates();
  waitForWatcher();
  enableMonitoring();

cleanup()
  disableMonitoring();
  clearMlState();
  if (isWaitForPendingTasks()) {
    // This waits for pending tasks to complete, so must go last (otherwise
    // it could be waiting for pending tasks while monitoring is still running).
    ESRestTestCase.waitForPendingTasks(adminClient(), task -> {
            // Don't check rollup jobs because we clear them in the superclass.
            return task.contains(RollupJob.NAME);
    });

waitForTemplates() {
  for (String template : templates) {
      awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
      response -> true,
      () -> "Exception when waiting for [" + template + "] template to be created");
  }

waitForWatcher() {
  if (isWatcherTest()) {
    // ensure watcher is started, so that a test can stop watcher and everything still works fine

enableMonitoring() {
  if (isMonitoringTest()) {
    // Enable monitoring and waits for monitoring documents to be collected and indexed

clearMlState() {
  if (isMachineLearningTest()) {
      new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata();
  }

isMonitoringTest() {
    String testName = getTestName();
    return testName != null && (testName.contains("=monitoring/") || testName.contains("=monitoring\\"));
}

isWatcherTest() {
    String testName = getTestName();
    return testName != null && (testName.contains("=watcher/") || testName.contains("=watcher\\"));
}

isMachineLearningTest() {
    String testName = getTestName();
    return testName != null && (testName.contains("=ml/") || testName.contains("=ml\\"));
}

// ----------------------------------------------------------------------------------------------------
// https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java

class MlRestTestStateCleaner { ... }

clearMlMetadata() {
  deleteAllDatafeeds();
  deleteAllJobs();
  deleteAllDataFrameAnalytics();
  // indices will be deleted by the ESRestTestCase class

deleteAllDatafeeds() {
  Request datafeedsRequest = new Request("GET", "/_ml/datafeeds");
  datafeedsRequest.addParameter("filter_path", "datafeeds");
  datafeeds = (List<Map<String, Object>>) XContentMapValues.extractValue("datafeeds", ESRestTestCase.entityAsMap(datafeedsResponse));
  try {
      adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop"));
  } catch (Exception e1) {
      logger.warn("failed to stop all datafeeds. Forcing stop", e1);
      try {
          adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop?force=true"));
      } catch (Exception e2) {
          logger.warn("Force-closing all data feeds failed", e2);
      }
      throw new RuntimeException(
              "Had to resort to force-stopping datafeeds, something went wrong?", e1);
  }
  for (Map<String, Object> datafeed : datafeeds) {
      String datafeedId = (String) datafeed.get("datafeed_id");
      adminClient.performRequest(new Request("DELETE", "/_ml/datafeeds/" + datafeedId));
  }

deleteAllJobs() {
  Request jobsRequest = new Request("GET", "/_ml/anomaly_detectors");
  jobsRequest.addParameter("filter_path", "jobs");
  jobConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue("jobs", ESRestTestCase.entityAsMap(response));
  if (jobConfigs == null) {
      return;
  }
  adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close"));
  for (Map<String, Object> jobConfig : jobConfigs) {
      String jobId = (String) jobConfig.get("job_id");
      adminClient.performRequest(new Request("DELETE", "/_ml/anomaly_detectors/" + jobId));
  }

deleteAllDataFrameAnalytics() {
  Request analyticsRequest = new Request("GET", "/_ml/data_frame/analytics?size=10000");
  analyticsRequest.addParameter("filter_path", "data_frame_analytics");
  analytics = (List<Map<String, Object>>) XContentMapValues.extractValue("data_frame_analytics", ESRestTestCase.entityAsMap(analyticsResponse));
  if (analytics == null) {
      return;
  }

  for (Map<String, Object> config : analytics) {
      String id = (String) config.get("id");
      adminClient.performRequest(new Request("DELETE", "/_ml/data_frame/analytics/" + id));
  }
  • Loading branch information
karmi committed Jun 29, 2019
1 parent e6cfc3f commit 81312fd
Showing 1 changed file with 113 additions and 10 deletions.
123 changes: 113 additions & 10 deletions internal/cmd/generate/commands/gentests/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (g *Generator) genFileHeader() {
import (
encjson "encoding/json"
encyaml "gopkg.in/yaml.v2"
"context"
"crypto/tls"
"testing"
"time"
Expand Down Expand Up @@ -289,14 +290,37 @@ func (g *Generator) genCommonSetup() {
commonSetup := func() {
var res *esapi.Response
{
res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForNoInitializingShards(true))
if res != nil && res.Body != nil {
defer res.Body.Close()
}
}
{
res, _ = es.Indices.Delete([]string{"_all"})
if res != nil && res.Body != nil { defer res.Body.Close() }
}
{
res, _ = es.Indices.DeleteTemplate("*")
if res != nil && res.Body != nil { defer res.Body.Close() }
var r map[string]interface{}
res, _ = es.Indices.GetTemplate()
if res != nil && res.Body != nil {
defer res.Body.Close()
json.NewDecoder(res.Body).Decode(&r)
for templateName, _ := range r {
if strings.HasPrefix(templateName, ".") {
continue
}
if templateName == "security_audit_log" {
continue
}
if templateName == "logstash-index-template" {
continue
}
es.Indices.DeleteTemplate(templateName)
}
}
}
{
Expand Down Expand Up @@ -327,6 +351,13 @@ func (g *Generator) genCommonSetup() {
}
}
}
{
res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForStatus("yellow"))
if res != nil && res.Body != nil {
defer res.Body.Close()
}
}
}
`)
Expand All @@ -338,6 +369,21 @@ func (g *Generator) genXPackSetup() {
xpackSetup := func() {
var res *esapi.Response
{
var r map[string]interface{}
res, _ = es.Indices.GetTemplate()
if res != nil && res.Body != nil {
defer res.Body.Close()
json.NewDecoder(res.Body).Decode(&r)
for templateName, _ := range r {
if strings.HasPrefix(templateName, ".") {
continue
}
es.Indices.DeleteTemplate(templateName)
}
}
}
{
res, _ = es.Watcher.DeleteWatch("my_watch")
if res != nil && res.Body != nil {
Expand Down Expand Up @@ -395,8 +441,10 @@ func (g *Generator) genXPackSetup() {
{
var r map[string]interface{}
es.ML.StopDatafeed("_all")
res, _ = es.ML.GetDatafeeds(es.ML.GetDatafeeds.WithAllowNoDatafeeds(true))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
es.ML.StopDatafeed("_all", es.ML.StopDatafeed.WithContext(ctx))
res, _ = es.ML.GetDatafeeds()
if res != nil && res.Body != nil {
defer res.Body.Close()
json.NewDecoder(res.Body).Decode(&r)
Expand All @@ -412,13 +460,15 @@ func (g *Generator) genXPackSetup() {
{
var r map[string]interface{}
es.ML.CloseJob("_all", es.ML.CloseJob.WithForce(true))
res, _ = es.ML.GetJobs(es.ML.GetJobs.WithAllowNoJobs(true))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
es.ML.CloseJob("_all", es.ML.CloseJob.WithContext(ctx))
res, _ = es.ML.GetJobs()
if res != nil && res.Body != nil {
defer res.Body.Close()
json.NewDecoder(res.Body).Decode(&r)
for _, v := range r["jobs"].([]interface{}) {
jobID, ok := v.(map[string]interface{})["datafeed_id"]
jobID, ok := v.(map[string]interface{})["job_id"]
if !ok {
continue
}
Expand All @@ -438,7 +488,7 @@ func (g *Generator) genXPackSetup() {
if !ok {
continue
}
es.Rollup.StopJob(jobID.(string))
es.Rollup.StopJob(jobID.(string), es.Rollup.StopJob.WithWaitForCompletion(true))
es.Rollup.DeleteJob(jobID.(string))
}
}
Expand All @@ -457,12 +507,45 @@ func (g *Generator) genXPackSetup() {
continue
}
taskID := fmt.Sprintf("%v:%v", v.(map[string]interface{})["node"], v.(map[string]interface{})["id"])
es.Tasks.Cancel(es.Tasks.Cancel.WithTaskID(taskID))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
es.Tasks.Cancel(es.Tasks.Cancel.WithTaskID(taskID), es.Tasks.Cancel.WithContext(ctx))
}
}
}
}
{
var r map[string]interface{}
res, _ = es.Snapshot.GetRepository()
if res != nil && res.Body != nil {
defer res.Body.Close()
json.NewDecoder(res.Body).Decode(&r)
for repositoryID, _ := range r {
var r map[string]interface{}
res, _ = es.Snapshot.Get(repositoryID, []string{"_all"})
json.NewDecoder(res.Body).Decode(&r)
for _, vv := range r["responses"].([]interface{}) {
for _, v := range vv.(map[string]interface{})["snapshots"].([]interface{}) {
snapshotID, ok := v.(map[string]interface{})["snapshot"]
if !ok {
continue
}
es.Snapshot.Delete(repositoryID, fmt.Sprintf("%s", snapshotID))
}
}
es.Snapshot.DeleteRepository([]string{fmt.Sprintf("%s", repositoryID)})
}
}
}
{
res, _ = es.ILM.RemovePolicy(es.ILM.RemovePolicy.WithIndex("_all"))
if res != nil && res.Body != nil {
defer res.Body.Close()
}
}
{
res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForStatus("yellow"))
if res != nil && res.Body != nil {
Expand All @@ -478,7 +561,7 @@ func (g *Generator) genXPackSetup() {
}
{
res, _ = es.Indices.Refresh(es.Indices.Refresh.WithIndex(".security*"))
res, _ = es.Indices.Refresh(es.Indices.Refresh.WithIndex("_all"))
if res != nil && res.Body != nil {
defer res.Body.Close()
}
Expand All @@ -490,6 +573,26 @@ func (g *Generator) genXPackSetup() {
defer res.Body.Close()
}
}
{
var i int
for {
i++
var r map[string]interface{}
res, _ = es.Cluster.PendingTasks()
if res != nil && res.Body != nil {
defer res.Body.Close()
json.NewDecoder(res.Body).Decode(&r)
if len(r["tasks"].([]interface{})) < 1 {
break
}
}
if i > 30 {
break
}
time.Sleep(time.Second)
}
}
}
`)
Expand Down

0 comments on commit 81312fd

Please sign in to comment.