From ec5cc8df28392d7d41f05f95b59cc08ee31fdc05 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 17 Aug 2023 19:41:02 +0700 Subject: [PATCH] Fix downsample failure when FLS/DLS is enabled. If FLS/DLS is enabled (this is the case when trial/licence is active and security is enabled) then invoking the downsample API results in immediate failure. The downsample shard persistent task executor opens a searcher, but security didn't set indices permissions in the thread local (this happens via SecurityActionFilter). This will only happen on indices actions (which are actions with a request that implement IndicesRequest. This change does this by delegating to a transport action that executes always locally, and this way security prepares thread local headers correctly. This adds another layer of indirection, but without doing this FLS/DLS wouldn't work. Closes #98569 --- .../downsample/qa/with-security/build.gradle | 34 ++ .../xpack/downsample/DownsampleRestIT.java | 35 ++ .../test/downsample/10_basic.yml | 378 ++++++++++++++++++ .../xpack/downsample/Downsample.java | 66 +-- .../downsample/DownsampleShardIndexer.java | 8 - ...DownsampleShardPersistentTaskExecutor.java | 234 +++++++---- .../downsample/DownsampleShardTaskParams.java | 12 - 7 files changed, 614 insertions(+), 153 deletions(-) create mode 100644 x-pack/plugin/downsample/qa/with-security/build.gradle create mode 100644 x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java create mode 100644 x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml diff --git a/x-pack/plugin/downsample/qa/with-security/build.gradle b/x-pack/plugin/downsample/qa/with-security/build.gradle new file mode 100644 index 0000000000000..5eed735950187 --- /dev/null +++ b/x-pack/plugin/downsample/qa/with-security/build.gradle @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import org.elasticsearch.gradle.Version +import org.elasticsearch.gradle.internal.info.BuildParams + +apply plugin: 'elasticsearch.legacy-yaml-rest-test' +apply plugin: 'elasticsearch.legacy-yaml-rest-compat-test' + +dependencies { + yamlRestTestImplementation project(path: xpackModule('rollup')) +} + +restResources { + restApi { + include '_common', 'bulk', 'cluster', 'indices', 'search', 'ingest.put_pipeline', 'ingest.delete_pipeline' + } +} + +testClusters.configureEach { + testDistribution = 'DEFAULT' + setting 'xpack.license.self_generated.type', 'trial' + setting 'xpack.security.enabled', 'true' + user username: 'elastic_admin', password: 'admin-password' +} + +if (BuildParams.inFipsJvm){ + // This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC + tasks.named("yamlRestTest").configure{enabled = false } +} diff --git a/x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java b/x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java new file mode 100644 index 0000000000000..7bdbd5e6b0937 --- /dev/null +++ b/x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; + +public class DownsampleRestIT extends ESClientYamlSuiteTestCase { + + public DownsampleRestIT(final ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @Override + protected Settings restClientSettings() { + String authentication = basicAuthHeaderValue("elastic_admin", new SecureString("admin-password".toCharArray())); + return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", authentication).build(); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } + +} diff --git a/x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml b/x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml new file mode 100644 index 0000000000000..3134287f86460 --- /dev/null +++ b/x-pack/plugin/downsample/qa/with-security/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml @@ -0,0 +1,378 @@ +setup: + - skip: + version: " - 8.4.99" + reason: "rollup renamed to downsample in 8.5.0" + + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + index: + mode: time_series + routing_path: [metricset, k8s.pod.uid] + time_series: + start_time: 2021-04-28T00:00:00Z + end_time: 2021-04-29T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + created_at: + type: date_nanos + running: + type: boolean + number_of_containers: + type: integer + ip: + type: ip + tags: + type: keyword + values: + type: integer + multi-counter: + type: long + time_series_metric: counter + multi-gauge: + type: integer + time_series_metric: gauge + network: + properties: + tx: + type: long + time_series_metric: gauge + rx: + type: long + time_series_metric: gauge + - do: + bulk: + refresh: true + index: test + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "multi-counter" : [10, 11, 12], "multi-gauge": [100, 200, 150], "network": {"tx": 2001818691, "rx": 802133794}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "multi-counter" : [21, 22, 23], "multi-gauge": [90, 91, 95], "network": {"tx": 2005177954, "rx": 801479970}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.41", "multi-counter" : [1, 5, 10], "multi-gauge": [103, 110, 109], "network": {"tx": 2006223737, "rx": 802337279}, "created_at": "2021-04-28T19:36:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 2]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.22", "multi-counter" : [101, 102, 105], "multi-gauge": [100, 100, 100], "network": {"tx": 2012916202, "rx": 803685721}, "created_at": "2021-04-28T19:37:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 1]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.33", "multi-counter" : [7, 11, 44], "multi-gauge": [100, 100, 102], "network": {"tx": 1434521831, "rx": 530575198}, "created_at": "2021-04-28T19:42:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test"], "values": [2, 3, 4]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.56", "multi-counter" : [0, 0, 1], "multi-gauge": [101, 102, 102], "network": {"tx": 1434577921, "rx": 530600088}, "created_at": "2021-04-28T19:43:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test", "us-west2"], "values": [2, 1, 1]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.37", "multi-counter" : [1000, 1001, 1002], "multi-gauge": [99, 100, 110], "network": {"tx": 1434587694, "rx": 530604797}, "created_at": "2021-04-28T19:44:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [4, 5, 2]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.120", "multi-counter" : [76, 77, 78], "multi-gauge": [95, 98, 100], "network": {"tx": 1434595272, "rx": 530605511}, "created_at": "2021-04-28T19:45:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [3, 2, 1]}}}' + + - do: + indices.put_settings: + index: test + body: + index.blocks.write: true + + - do: + indices.create: + index: test-histogram + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-28T00:00:00Z + end_time: 2021-04-29T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + latency: + type: histogram + empty-histogram: + type: histogram + network: + properties: + tx: + type: long + time_series_metric: gauge + rx: + type: long + time_series_metric: gauge + - do: + bulk: + refresh: true + index: test-histogram + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "network": {"tx": 2001818691, "rx": 802133794}, "latency": {"counts": [10, 12, 20, 5], "values": [1.0, 10.0, 100.0, 1000.0]}}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:55:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "network": {"tx": 2001819988, "rx": 802133911}, "latency": {"counts": [8, 7, 10, 12], "values": [1.0, 2.0, 5.0, 10.0]}}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:00:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "network": {"tx": 1001818691, "rx": 502133794}, "latency": {"counts": [1, 5, 5, 22], "values": [1.0, 10.0, 100.0, 1000.0]}}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:05:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "network": {"tx": 1001822087, "rx": 502134222}, "latency": {"counts": [7, 15, 10, 10], "values": [1.0, 2.0, 5.0, 10.0]}}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:51:20.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "network": {"tx": 1781818691, "rx": 533135238}, "latency": {"counts": [2, 4, 16, 4], "values": [1.0, 2.0, 5.0, 10.0]}}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:56:20.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "network": {"tx": 1781818691, "rx": 533135567}, "latency": {"counts": [2, 2, 8, 8], "values": [1.0, 10.0, 100.0, 1000.0]}}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:01:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "network": {"tx": 1651818691, "rx": 487133866}, "latency": {"counts": [4, 5, 4, 13], "values": [1.0, 10.0, 100.0, 1000.0]}}}}' + + - do: + indices.put_settings: + index: test-histogram + body: + index.blocks.write: true + + - do: + indices.create: + index: test-object + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-28T00:00:00Z + end_time: 2021-04-29T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + agent: + type: object + properties: + id: + type: keyword + value: + type: long + time_series_metric: gauge + + - do: + bulk: + refresh: true + index: test-object + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "agent": { "id": "first", "version": "2.0.4" }, "value": 10 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "agent": { "id": "first", "version": "2.0.4" }, "value": 20 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "agent": { "id": "first", "version": "2.0.4" }, "value": 12 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "agent": { "id": "first", "version": "2.0.4" }, "value": 15 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "agent": { "id": "second", "version": "2.1.7" }, "value": 9 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "agent": { "id": "second", "version": "2.1.7" }, "value": 16 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "agent": { "id": "second", "version": "2.1.7" }, "value": 25 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "agent": { "id": "second", "version": "2.1.7" }, "value": 17 }}}' + + - do: + indices.put_settings: + index: test-object + body: + index.blocks.write: true + + - do: + indices.create: + index: test-empty-missing + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-28T00:00:00Z + end_time: 2021-04-29T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + value: + type: integer + time_series_metric: gauge + label: + type: keyword + + - do: + bulk: + refresh: true + index: test-empty-missing + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:55:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "value": 10 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "value": 20, "label": null, "unmapped": null }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:45:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "value": 30, "label": "abc", "unmapped": "abc" }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:40:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "value": 40, "label": "xyz", "unmapped": "xyz" }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:55:20.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "value": 10 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:20.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "value": 20, "label": null, "unmapped": null }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:45:20.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "value": 30, "label": "xyz", "unmapped": "xyz" }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:40:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "value": 40, "label": "abc", "unmapped": "abc" }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:55:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e9597ab", "value": 10 }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e9597ab", "value": 20, "label": null, "unmapped": null }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:45:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e9597ab", "value": 30, "label": null }}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:40:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e9597ab", "value": 40, "unmapped": null }}}' + + - do: + indices.put_settings: + index: test-empty-missing + body: + index.blocks.write: true + +--- +"Downsample index": + - skip: + version: " - 8.4.99" + reason: "Downsampling GA-ed in 8.7.0" + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + search: + index: test-downsample + body: + sort: [ "_tsid", "@timestamp" ] + + - length: { hits.hits: 4 } + - match: { hits.hits.0._source._doc_count: 2 } + - match: { hits.hits.0._source.k8s.pod.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.0._source.metricset: pod } + - match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z } + - match: { hits.hits.0._source.k8s.pod.multi-counter: 21 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.min: 90 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.max: 200 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.sum: 726 } + - match: { hits.hits.0._source.k8s.pod.multi-gauge.value_count: 6 } + - match: { hits.hits.0._source.k8s.pod.network.tx.min: 2001818691 } + - match: { hits.hits.0._source.k8s.pod.network.tx.max: 2005177954 } + - match: { hits.hits.0._source.k8s.pod.network.tx.value_count: 2 } + - match: { hits.hits.0._source.k8s.pod.ip: "10.10.55.26" } + - match: { hits.hits.0._source.k8s.pod.created_at: "2021-04-28T19:35:00.000Z" } + - match: { hits.hits.0._source.k8s.pod.number_of_containers: 2 } + - match: { hits.hits.0._source.k8s.pod.tags: ["backend", "prod", "us-west1"] } + - match: { hits.hits.0._source.k8s.pod.values: [1, 1, 3] } + - is_true: hits.hits.0._source.k8s.pod.running + + # Assert rollup index settings + - do: + indices.get_settings: + index: test-downsample + + - match: { test-downsample.settings.index.mode: time_series } + - match: { test-downsample.settings.index.time_series.end_time: 2021-04-29T00:00:00Z } + - match: { test-downsample.settings.index.time_series.start_time: 2021-04-28T00:00:00Z } + - match: { test-downsample.settings.index.routing_path: [ "metricset", "k8s.pod.uid"] } + - match: { test-downsample.settings.index.downsample.source.name: test } + - match: { test-downsample.settings.index.number_of_shards: "1" } + - match: { test-downsample.settings.index.number_of_replicas: "0" } + + # Assert rollup index mapping + - do: + indices.get_mapping: + index: test-downsample + + - match: { test-downsample.mappings.properties.@timestamp.type: date } + - match: { test-downsample.mappings.properties.@timestamp.meta.fixed_interval: 1h } + - match: { test-downsample.mappings.properties.@timestamp.meta.time_zone: UTC } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.multi-gauge.type: aggregate_metric_double } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.multi-gauge.metrics: [ "min", "max", "sum", "value_count" ] } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.multi-gauge.default_metric: max } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.multi-gauge.time_series_metric: gauge } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.multi-counter.type: long } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.multi-counter.time_series_metric: counter } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.uid.type: keyword } + - match: { test-downsample.mappings.properties.k8s.properties.pod.properties.uid.time_series_dimension: true } + + + # Assert source index has not been deleted + - do: + indices.get: + index: test + + # Assert rollup index has been force merged + - do: + indices.segments: + index: test-downsample + + - match: { _shards.total: 1} + - match: { indices.test-downsample.shards.0.0.num_committed_segments: 1} + - match: { indices.test-downsample.shards.0.0.num_search_segments: 1} diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java index fbdf3dc78a38e..d48fd568279d0 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java @@ -13,7 +13,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; @@ -22,31 +21,23 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.env.Environment; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.tracing.Tracer; -import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; -import java.util.Collection; import java.util.List; import java.util.function.Supplier; @@ -55,8 +46,6 @@ public class Downsample extends Plugin implements ActionPlugin, PersistentTaskPl public static final String DOWSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing"; private static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256; - private IndicesService indicesService; - @Override public List> getExecutorBuilders(Settings settings) { final FixedExecutorBuilder downsample = new FixedExecutorBuilder( @@ -74,7 +63,11 @@ public List> getExecutorBuilders(Settings settings) { public List> getActions() { return List.of( new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class), - new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class) + new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class), + new ActionHandler<>( + DownsampleShardPersistentTaskExecutor.DelegatingAction.INSTANCE, + DownsampleShardPersistentTaskExecutor.DelegatingAction.TA.class + ) ); } @@ -99,14 +92,7 @@ public List> getPersistentTasksExecutor( SettingsModule settingsModule, IndexNameExpressionResolver expressionResolver ) { - return List.of( - new DownsampleShardPersistentTaskExecutor( - client, - this.indicesService, - DownsampleShardTask.TASK_NAME, - DOWSAMPLE_TASK_THREAD_POOL_NAME - ) - ); + return List.of(new DownsampleShardPersistentTaskExecutor(client, DownsampleShardTask.TASK_NAME, DOWSAMPLE_TASK_THREAD_POOL_NAME)); } @Override @@ -136,46 +122,8 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry( PersistentTaskParams.class, DownsampleShardTaskParams.NAME, - DownsampleShardTaskParams::readFromStream + DownsampleShardTaskParams::new ) ); } - - @Override - public Collection createComponents( - final Client client, - final ClusterService clusterService, - final ThreadPool threadPool, - final ResourceWatcherService resourceWatcherService, - final ScriptService scriptService, - final NamedXContentRegistry xContentRegistry, - final Environment environment, - final NodeEnvironment nodeEnvironment, - final NamedWriteableRegistry namedWriteableRegistry, - final IndexNameExpressionResolver indexNameExpressionResolver, - final Supplier repositoriesServiceSupplier, - final Tracer tracer, - final AllocationService allocationService, - final IndicesService indicesService - ) { - final Collection components = super.createComponents( - client, - clusterService, - threadPool, - resourceWatcherService, - scriptService, - xContentRegistry, - environment, - nodeEnvironment, - namedWriteableRegistry, - indexNameExpressionResolver, - repositoriesServiceSupplier, - tracer, - allocationService, - indicesService - ); - - this.indicesService = indicesService; - return components; - } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index 460401650aed3..0260fd2df5750 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -172,10 +172,6 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept if (task.getNumIndexed() != task.getNumSent()) { task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.FAILED); - task.updatePersistentTaskState( - new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null), - ActionListener.noop() - ); final String error = "Downsampling task [" + task.getPersistentTaskId() + "] on shard " @@ -199,10 +195,6 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept + task.getNumFailed() + "]"; logger.info(error); - task.updatePersistentTaskState( - new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null), - ActionListener.noop() - ); throw new DownsampleShardIndexerException(error, false); } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 8910fca95cdb3..2bafb2d7ed5ef 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -10,13 +10,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -26,7 +35,10 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; @@ -37,19 +49,12 @@ import java.util.Objects; public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor { - private static final Logger logger = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class); + private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class); private final Client client; - private final IndicesService indicesService; - public DownsampleShardPersistentTaskExecutor( - final Client client, - final IndicesService indicesService, - final String taskName, - final String executorName - ) { + public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, final String executorName) { super(taskName, executorName); this.client = Objects.requireNonNull(client); - this.indicesService = Objects.requireNonNull(indicesService); } @Override @@ -65,73 +70,12 @@ protected void nodeOperation( client.search( searchRequest, ActionListener.wrap( - searchResponse -> fork(task, params, searchResponse.getHits().getHits()), - e -> fork(task, params, new SearchHit[] {}) + searchResponse -> delegate(task, params, searchResponse.getHits().getHits()), + e -> delegate(task, params, new SearchHit[] {}) ) ); } - private void fork( - final AllocatedPersistentTask task, - final DownsampleShardTaskParams params, - final SearchHit[] lastDownsampledTsidHits - ) { - client.threadPool().executor(Downsample.DOWSAMPLE_TASK_THREAD_POOL_NAME).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - task.markAsFailed(e); - } - - @Override - protected void doRun() throws Exception { - startDownsampleShardIndexer(task, params, lastDownsampledTsidHits); - } - }); - } - - private void startDownsampleShardIndexer( - final AllocatedPersistentTask task, - final DownsampleShardTaskParams params, - final SearchHit[] lastDownsampleTsidHits - ) { - final DownsampleShardPersistentTaskState initialState = lastDownsampleTsidHits.length == 0 - ? new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) - : new DownsampleShardPersistentTaskState( - DownsampleShardIndexerStatus.STARTED, - Arrays.stream(lastDownsampleTsidHits).findFirst().get().field("_tsid").getValue() - ); - final DownsampleShardIndexer downsampleShardIndexer = new DownsampleShardIndexer( - (DownsampleShardTask) task, - client, - getIndexService(indicesService, params), - params.shardId(), - params.downsampleIndex(), - params.downsampleConfig(), - params.metrics(), - params.labels(), - initialState - ); - try { - downsampleShardIndexer.execute(); - task.markAsCompleted(); - } catch (final DownsampleShardIndexerException e) { - if (e.isRetriable()) { - logger.error("Downsampling task [" + task.getPersistentTaskId() + " retriable failure [" + e.getMessage() + "]"); - task.markAsLocallyAborted(e.getMessage()); - } else { - logger.error("Downsampling task [" + task.getPersistentTaskId() + " non retriable failure [" + e.getMessage() + "]"); - task.markAsFailed(e); - } - } catch (final Exception e) { - logger.error("Downsampling task [" + task.getPersistentTaskId() + " non-retriable failure [" + e.getMessage() + "]"); - task.markAsFailed(e); - } - } - - private static IndexService getIndexService(final IndicesService indicesService, final DownsampleShardTaskParams params) { - return indicesService.indexService(params.shardId().getIndex()); - } - @Override protected AllocatedPersistentTask createTask( long id, @@ -185,6 +129,148 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( @Override public String getExecutor() { - return Downsample.DOWSAMPLE_TASK_THREAD_POOL_NAME; + // The delegate action forks to the a downsample thread: + return ThreadPool.Names.SAME; + } + + private void delegate( + final AllocatedPersistentTask task, + final DownsampleShardTaskParams params, + final SearchHit[] lastDownsampledTsidHits + ) { + client.execute( + DelegatingAction.INSTANCE, + new DelegatingAction.Request((DownsampleShardTask) task, lastDownsampledTsidHits, params), + ActionListener.wrap(empty -> {}, e -> { + LOGGER.error("error while delegating", e); + markAsFailed(task, e); + }) + ); + } + + static void realNodeOperation( + Client client, + IndicesService indicesService, + DownsampleShardTask task, + DownsampleShardTaskParams params, + SearchHit[] lastDownsampleTsidHits + ) { + client.threadPool().executor(Downsample.DOWSAMPLE_TASK_THREAD_POOL_NAME).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + markAsFailed(task, e); + } + + @Override + protected void doRun() throws Exception { + final var initialState = lastDownsampleTsidHits.length == 0 + ? new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) + : new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.STARTED, + Arrays.stream(lastDownsampleTsidHits).findFirst().get().field("_tsid").getValue() + ); + final var downsampleShardIndexer = new DownsampleShardIndexer( + task, + client, + indicesService.indexService(params.shardId().getIndex()), + params.shardId(), + params.downsampleIndex(), + params.downsampleConfig(), + params.metrics(), + params.labels(), + initialState + ); + try { + downsampleShardIndexer.execute(); + task.markAsCompleted(); + } catch (final DownsampleShardIndexerException e) { + if (e.isRetriable()) { + LOGGER.warn("Downsampling task [" + task.getPersistentTaskId() + " retriable failure [" + e.getMessage() + "]"); + task.markAsLocallyAborted(e.getMessage()); + } else { + LOGGER.error( + "Downsampling task [" + task.getPersistentTaskId() + " non retriable failure [" + e.getMessage() + "]" + ); + markAsFailed(task, e); + } + } catch (final Exception e) { + LOGGER.error("Downsampling task [" + task.getPersistentTaskId() + " non-retriable failure [" + e.getMessage() + "]"); + markAsFailed(task, e); + } + } + }); + } + + private static void markAsFailed(AllocatedPersistentTask task, Exception e) { + task.updatePersistentTaskState( + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null), + ActionListener.running(() -> task.markAsFailed(e)) + ); + } + + // This is needed for FLS/DLS to work correctly. The _indices_permissions in the thread local aren't set if an searcher is acquired + // directly from this persistent task executor. By delegating to this action (with a request that implements IndicesRequest) the + // security thread local will be setup correctly so that we avoid this error: + // org.elasticsearch.ElasticsearchSecurityException: no indices permissions found + public static class DelegatingAction extends ActionType { + + public static final DelegatingAction INSTANCE = new DelegatingAction(); + public static final String NAME = "indices:data/read/dummy"; + + private DelegatingAction() { + super(NAME, in -> new ActionResponse.Empty()); + } + + public static class Request extends ActionRequest implements IndicesRequest { + + private final DownsampleShardTask task; + private final SearchHit[] lastDownsampleTsidHits; + private final DownsampleShardTaskParams params; + + public Request(DownsampleShardTask task, SearchHit[] lastDownsampleTsidHits, DownsampleShardTaskParams params) { + this.task = task; + this.lastDownsampleTsidHits = lastDownsampleTsidHits; + this.params = params; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public String[] indices() { + return new String[] { params.shardId().getIndexName() }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_EXPAND_OPEN; + } + + @Override + public void writeTo(StreamOutput out) { + throw new IllegalStateException("request should stay local"); + } + } + + public static class TA extends TransportAction { + + private final Client client; + private final IndicesService indicesService; + + @Inject + public TA(TransportService transportService, ActionFilters actionFilters, Client client, IndicesService indicesService) { + super(NAME, actionFilters, transportService.getTaskManager()); + this.client = client; + this.indicesService = indicesService; + } + + @Override + protected void doExecute(Task t, Request request, ActionListener listener) { + realNodeOperation(client, indicesService, request.task, request.params, request.lastDownsampleTsidHits); + listener.onResponse(ActionResponse.Empty.INSTANCE); + } + } } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java index a49ef3c306fe8..2962ed9e3ae3d 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java @@ -104,18 +104,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(labels); } - public static DownsampleShardTaskParams readFromStream(final StreamInput in) throws IOException { - return new DownsampleShardTaskParams( - new DownsampleConfig(in), - in.readString(), - in.readVLong(), - in.readVLong(), - new ShardId(in), - in.readStringArray(), - in.readStringArray() - ); - } - public static DownsampleShardTaskParams fromXContent(XContentParser parser) throws IOException { final DownsampleShardTaskParams.Builder builder = new DownsampleShardTaskParams.Builder(); PARSER.parse(parser, builder, null);