From 81312fd30fe46e53dcb9dd1dedea01f0f6406b32 Mon Sep 17 00:00:00 2001 From: Karel Minarik Date: Fri, 28 Jun 2019 17:38:57 +0200 Subject: [PATCH] WIP > XPack, Update Core and X-Pack test setup * Do not delete X-Pack templates * Use context timeouts * Check pending tasks // ---------------------------------------------------------------------------------------------------- // https://github.com/elastic/elasticsearch/blob/master/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java ensureNoInitializingShards(); wipeCluster(); waitForClusterStateUpdatesToFinish(); logIfThereAreRunningTasks(); wipeCluster() { if (hasXPack) { wipeRollupJobs(); waitForPendingRollupTasks(); } wipeSnapshots(); adminClient().performRequest(new Request("DELETE", "*")); if (hasXPack) { Request request = new Request("GET", "_cat/templates"); request.addParameter("h", "name"); if (isXPackTemplate(template)) continue; adminClient().performRequest(new Request("DELETE", "_template/" + template)); } else { adminClient().performRequest(new Request("DELETE", "_template/*")); } wipeClusterSettings(); if (hasXPack) { deleteAllPolicies(); } wipeRollupJobs() { Response response = adminClient().performRequest(new Request("GET", "/_rollup/job/_all")); for (Map jobConfig : jobConfigs) { String jobId = (String) ((Map) jobConfig.get("config")).get("id"); Request request = new Request("POST", "/_rollup/job/" + jobId + "/_stop"); request.addParameter("ignore", "404"); request.addParameter("wait_for_completion", "true"); request.addParameter("timeout", "10s"); } for (Map jobConfig : jobConfigs) { Request request = new Request("DELETE", "/_rollup/job/" + jobId); request.addParameter("ignore", "404"); } waitForPendingRollupTasks() { waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false); ensureNoInitializingShards() { Request request = new Request("GET", "/_cluster/health"); request.addParameter("wait_for_no_initializing_shards", "true"); request.addParameter("timeout", "70s"); request.addParameter("level", "shards"); adminClient().performRequest(request); waitForClusterStateUpdatesToFinish() { assertBusy(() -> { Response response = adminClient().performRequest(new Request("GET", "/_cluster/pending_tasks")); List tasks = (List) entityAsMap(response).get("tasks"); if (false == tasks.isEmpty()) { fail(message.toString()); }, 30, TimeUnit.SECONDS); // curl -s -k -X POST 'https://elastic:elastic@localhost:9200/_all/_ilm/remove' deleteAllPolicies() { Response response = adminClient().performRequest(new Request("GET", "/_ilm/policy")); for (String policyName : policies.keySet()) { adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); } // https://github.com/elastic/elasticsearch/pull/31642 // // > At the end of every ESRestTestCase we clean the cluster which includes // > deleting all of the templates. If xpack is installed it'll automatically // > recreate a few templates every time they are removed. Which is slow. // isXPackTemplate(String name) { if (name.startsWith(".monitoring-")) { return true; } if (name.startsWith(".watch") || name.startsWith(".triggered_watches")) { return true; } if (name.startsWith(".ml-")) { return true; } switch (name) { case ".triggered_watches": case ".watches": case "logstash-index-template": case "security_audit_log": return true; default: return false; } // ---------------------------------------------------------------------------------------------------- // https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java setupForTests() { waitForTemplates(); waitForWatcher(); enableMonitoring(); cleanup() disableMonitoring(); clearMlState(); if (isWaitForPendingTasks()) { // This waits for pending tasks to complete, so must go last (otherwise // it could be waiting for pending tasks while monitoring is still running). ESRestTestCase.waitForPendingTasks(adminClient(), task -> { // Don't check rollup jobs because we clear them in the superclass. return task.contains(RollupJob.NAME); }); waitForTemplates() { for (String template : templates) { awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(), response -> true, () -> "Exception when waiting for [" + template + "] template to be created"); } waitForWatcher() { if (isWatcherTest()) { // ensure watcher is started, so that a test can stop watcher and everything still works fine enableMonitoring() { if (isMonitoringTest()) { // Enable monitoring and waits for monitoring documents to be collected and indexed clearMlState() { if (isMachineLearningTest()) { new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata(); } isMonitoringTest() { String testName = getTestName(); return testName != null && (testName.contains("=monitoring/") || testName.contains("=monitoring\\")); } isWatcherTest() { String testName = getTestName(); return testName != null && (testName.contains("=watcher/") || testName.contains("=watcher\\")); } isMachineLearningTest() { String testName = getTestName(); return testName != null && (testName.contains("=ml/") || testName.contains("=ml\\")); } // ---------------------------------------------------------------------------------------------------- // https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java class MlRestTestStateCleaner { ... } clearMlMetadata() { deleteAllDatafeeds(); deleteAllJobs(); deleteAllDataFrameAnalytics(); // indices will be deleted by the ESRestTestCase class deleteAllDatafeeds() { Request datafeedsRequest = new Request("GET", "/_ml/datafeeds"); datafeedsRequest.addParameter("filter_path", "datafeeds"); datafeeds = (List>) XContentMapValues.extractValue("datafeeds", ESRestTestCase.entityAsMap(datafeedsResponse)); try { adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop")); } catch (Exception e1) { logger.warn("failed to stop all datafeeds. Forcing stop", e1); try { adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop?force=true")); } catch (Exception e2) { logger.warn("Force-closing all data feeds failed", e2); } throw new RuntimeException( "Had to resort to force-stopping datafeeds, something went wrong?", e1); } for (Map datafeed : datafeeds) { String datafeedId = (String) datafeed.get("datafeed_id"); adminClient.performRequest(new Request("DELETE", "/_ml/datafeeds/" + datafeedId)); } deleteAllJobs() { Request jobsRequest = new Request("GET", "/_ml/anomaly_detectors"); jobsRequest.addParameter("filter_path", "jobs"); jobConfigs = (List>) XContentMapValues.extractValue("jobs", ESRestTestCase.entityAsMap(response)); if (jobConfigs == null) { return; } adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close")); for (Map jobConfig : jobConfigs) { String jobId = (String) jobConfig.get("job_id"); adminClient.performRequest(new Request("DELETE", "/_ml/anomaly_detectors/" + jobId)); } deleteAllDataFrameAnalytics() { Request analyticsRequest = new Request("GET", "/_ml/data_frame/analytics?size=10000"); analyticsRequest.addParameter("filter_path", "data_frame_analytics"); analytics = (List>) XContentMapValues.extractValue("data_frame_analytics", ESRestTestCase.entityAsMap(analyticsResponse)); if (analytics == null) { return; } for (Map config : analytics) { String id = (String) config.get("id"); adminClient.performRequest(new Request("DELETE", "/_ml/data_frame/analytics/" + id)); } --- .../generate/commands/gentests/generator.go | 123 ++++++++++++++++-- 1 file changed, 113 insertions(+), 10 deletions(-) diff --git a/internal/cmd/generate/commands/gentests/generator.go b/internal/cmd/generate/commands/gentests/generator.go index 076dedaf37..a1135a6f4a 100755 --- a/internal/cmd/generate/commands/gentests/generator.go +++ b/internal/cmd/generate/commands/gentests/generator.go @@ -193,6 +193,7 @@ func (g *Generator) genFileHeader() { import ( encjson "encoding/json" encyaml "gopkg.in/yaml.v2" + "context" "crypto/tls" "testing" "time" @@ -289,14 +290,37 @@ func (g *Generator) genCommonSetup() { commonSetup := func() { var res *esapi.Response + { + res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForNoInitializingShards(true)) + if res != nil && res.Body != nil { + defer res.Body.Close() + } + } + { res, _ = es.Indices.Delete([]string{"_all"}) if res != nil && res.Body != nil { defer res.Body.Close() } } { - res, _ = es.Indices.DeleteTemplate("*") - if res != nil && res.Body != nil { defer res.Body.Close() } + var r map[string]interface{} + res, _ = es.Indices.GetTemplate() + if res != nil && res.Body != nil { + defer res.Body.Close() + json.NewDecoder(res.Body).Decode(&r) + for templateName, _ := range r { + if strings.HasPrefix(templateName, ".") { + continue + } + if templateName == "security_audit_log" { + continue + } + if templateName == "logstash-index-template" { + continue + } + es.Indices.DeleteTemplate(templateName) + } + } } { @@ -327,6 +351,13 @@ func (g *Generator) genCommonSetup() { } } } + + { + res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForStatus("yellow")) + if res != nil && res.Body != nil { + defer res.Body.Close() + } + } } `) @@ -338,6 +369,21 @@ func (g *Generator) genXPackSetup() { xpackSetup := func() { var res *esapi.Response + { + var r map[string]interface{} + res, _ = es.Indices.GetTemplate() + if res != nil && res.Body != nil { + defer res.Body.Close() + json.NewDecoder(res.Body).Decode(&r) + for templateName, _ := range r { + if strings.HasPrefix(templateName, ".") { + continue + } + es.Indices.DeleteTemplate(templateName) + } + } + } + { res, _ = es.Watcher.DeleteWatch("my_watch") if res != nil && res.Body != nil { @@ -395,8 +441,10 @@ func (g *Generator) genXPackSetup() { { var r map[string]interface{} - es.ML.StopDatafeed("_all") - res, _ = es.ML.GetDatafeeds(es.ML.GetDatafeeds.WithAllowNoDatafeeds(true)) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + es.ML.StopDatafeed("_all", es.ML.StopDatafeed.WithContext(ctx)) + res, _ = es.ML.GetDatafeeds() if res != nil && res.Body != nil { defer res.Body.Close() json.NewDecoder(res.Body).Decode(&r) @@ -412,13 +460,15 @@ func (g *Generator) genXPackSetup() { { var r map[string]interface{} - es.ML.CloseJob("_all", es.ML.CloseJob.WithForce(true)) - res, _ = es.ML.GetJobs(es.ML.GetJobs.WithAllowNoJobs(true)) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + es.ML.CloseJob("_all", es.ML.CloseJob.WithContext(ctx)) + res, _ = es.ML.GetJobs() if res != nil && res.Body != nil { defer res.Body.Close() json.NewDecoder(res.Body).Decode(&r) for _, v := range r["jobs"].([]interface{}) { - jobID, ok := v.(map[string]interface{})["datafeed_id"] + jobID, ok := v.(map[string]interface{})["job_id"] if !ok { continue } @@ -438,7 +488,7 @@ func (g *Generator) genXPackSetup() { if !ok { continue } - es.Rollup.StopJob(jobID.(string)) + es.Rollup.StopJob(jobID.(string), es.Rollup.StopJob.WithWaitForCompletion(true)) es.Rollup.DeleteJob(jobID.(string)) } } @@ -457,12 +507,45 @@ func (g *Generator) genXPackSetup() { continue } taskID := fmt.Sprintf("%v:%v", v.(map[string]interface{})["node"], v.(map[string]interface{})["id"]) - es.Tasks.Cancel(es.Tasks.Cancel.WithTaskID(taskID)) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + es.Tasks.Cancel(es.Tasks.Cancel.WithTaskID(taskID), es.Tasks.Cancel.WithContext(ctx)) + } + } + } + } + + { + var r map[string]interface{} + res, _ = es.Snapshot.GetRepository() + if res != nil && res.Body != nil { + defer res.Body.Close() + json.NewDecoder(res.Body).Decode(&r) + for repositoryID, _ := range r { + var r map[string]interface{} + res, _ = es.Snapshot.Get(repositoryID, []string{"_all"}) + json.NewDecoder(res.Body).Decode(&r) + for _, vv := range r["responses"].([]interface{}) { + for _, v := range vv.(map[string]interface{})["snapshots"].([]interface{}) { + snapshotID, ok := v.(map[string]interface{})["snapshot"] + if !ok { + continue + } + es.Snapshot.Delete(repositoryID, fmt.Sprintf("%s", snapshotID)) + } } + es.Snapshot.DeleteRepository([]string{fmt.Sprintf("%s", repositoryID)}) } } } + { + res, _ = es.ILM.RemovePolicy(es.ILM.RemovePolicy.WithIndex("_all")) + if res != nil && res.Body != nil { + defer res.Body.Close() + } + } + { res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForStatus("yellow")) if res != nil && res.Body != nil { @@ -478,7 +561,7 @@ func (g *Generator) genXPackSetup() { } { - res, _ = es.Indices.Refresh(es.Indices.Refresh.WithIndex(".security*")) + res, _ = es.Indices.Refresh(es.Indices.Refresh.WithIndex("_all")) if res != nil && res.Body != nil { defer res.Body.Close() } @@ -490,6 +573,26 @@ func (g *Generator) genXPackSetup() { defer res.Body.Close() } } + + { + var i int + for { + i++ + var r map[string]interface{} + res, _ = es.Cluster.PendingTasks() + if res != nil && res.Body != nil { + defer res.Body.Close() + json.NewDecoder(res.Body).Decode(&r) + if len(r["tasks"].([]interface{})) < 1 { + break + } + } + if i > 30 { + break + } + time.Sleep(time.Second) + } + } } `)