diff --git a/docs/changelog/99480.yaml b/docs/changelog/99480.yaml new file mode 100644 index 0000000000000..08dcdceca60b0 --- /dev/null +++ b/docs/changelog/99480.yaml @@ -0,0 +1,6 @@ +pr: 99480 +summary: Fix deadlock between Cache.put and Cache.invalidateAll +area: Infra/Core +type: bug +issues: + - 99326 diff --git a/docs/changelog/99515.yaml b/docs/changelog/99515.yaml new file mode 100644 index 0000000000000..7de237531a506 --- /dev/null +++ b/docs/changelog/99515.yaml @@ -0,0 +1,5 @@ +pr: 99515 +summary: Add `IndexVersion` to node info +area: Infra/REST API +type: enhancement +issues: [] diff --git a/docs/changelog/99567.yaml b/docs/changelog/99567.yaml new file mode 100644 index 0000000000000..aea65e55b6ee2 --- /dev/null +++ b/docs/changelog/99567.yaml @@ -0,0 +1,6 @@ +pr: 99567 +summary: Make tsdb settings public in Serverless +area: TSDB +type: bug +issues: + - 99563 diff --git a/docs/reference/cluster/nodes-info.asciidoc b/docs/reference/cluster/nodes-info.asciidoc index e2dbffcd9705a..00650ce948b6f 100644 --- a/docs/reference/cluster/nodes-info.asciidoc +++ b/docs/reference/cluster/nodes-info.asciidoc @@ -136,6 +136,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=node-id] `transport_version`:: The most recent transport version that this node can communicate with. +`index_version`:: + The most recent index version that this node can read. + The `os` flag can be set to retrieve information that concern the operating system: @@ -232,6 +235,7 @@ The API returns the following response: "ip": "192.168.17", "version": "{version}", "transport_version": 100000298, + "index_version": 100000074, "build_flavor": "default", "build_type": "{build_type}", "build_hash": "587409e", @@ -271,6 +275,7 @@ The API returns the following response: // TESTRESPONSE[s/"host": "node-0.elastic.co"/"host": $body.$_path/] // TESTRESPONSE[s/"ip": "192.168.17"/"ip": $body.$_path/] // TESTRESPONSE[s/"transport_version": 100000298/"transport_version": $body.$_path/] +// TESTRESPONSE[s/"index_version": 100000074/"index_version": $body.$_path/] // TESTRESPONSE[s/"build_hash": "587409e"/"build_hash": $body.$_path/] // TESTRESPONSE[s/"roles": \[[^\]]*\]/"roles": $body.$_path/] // TESTRESPONSE[s/"attributes": \{[^\}]*\}/"attributes": $body.$_path/] @@ -305,6 +310,7 @@ The API returns the following response: "ip": "192.168.17", "version": "{version}", "transport_version": 100000298, + "index_version": 100000074, "build_flavor": "default", "build_type": "{build_type}", "build_hash": "587409e", @@ -368,6 +374,7 @@ The API returns the following response: // TESTRESPONSE[s/"host": "node-0.elastic.co"/"host": $body.$_path/] // TESTRESPONSE[s/"ip": "192.168.17"/"ip": $body.$_path/] // TESTRESPONSE[s/"transport_version": 100000298/"transport_version": $body.$_path/] +// TESTRESPONSE[s/"index_version": 100000074/"index_version": $body.$_path/] // TESTRESPONSE[s/"build_hash": "587409e"/"build_hash": $body.$_path/] // TESTRESPONSE[s/"roles": \[[^\]]*\]/"roles": $body.$_path/] // TESTRESPONSE[s/"attributes": \{[^\}]*\}/"attributes": $body.$_path/] diff --git a/docs/reference/migration/migrate_8_10.asciidoc b/docs/reference/migration/migrate_8_10.asciidoc index 84600fd96bd69..a1d132812ad03 100644 --- a/docs/reference/migration/migrate_8_10.asciidoc +++ b/docs/reference/migration/migrate_8_10.asciidoc @@ -9,11 +9,81 @@ your application to {es} 8.10. See also <> and <>. -coming::[8.10.0] - - [discrete] [[breaking-changes-8.10]] === Breaking changes -There are no breaking changes in {es} 8.10. +The following changes in {es} 8.10 might affect your applications +and prevent them from operating normally. +Before upgrading to 8.10, review these changes and take the described steps +to mitigate the impact. + + +There are no notable breaking changes in {es} 8.10. +But there are some less critical breaking changes. + +[discrete] +[[breaking_810_cluster_and_node_setting_changes]] +==== Cluster and node setting changes + +[[remove_unused_executor_builder_for_vector_tile_plugin]] +.Remove the unused executor builder for vector tile plugin +[%collapsible] +==== +*Details* + +The threadpool called `vectortile` is a left over from the original development of the vector tile search end point and it is used nowhere. It can still be a breaking change if it is configured on the elasticsearch yml file, for example by changing the threadpool size `thread_pool.vectortile.size=8`' + +*Impact* + +In the case the threadpool appears on the yaml file, Elasticsearch will not start until those lines are removed. +==== + +[discrete] +[[breaking_810_java_api_changes]] +==== Java API changes + +[[change_pre_configured_cached_analyzer_components_to_use_indexversion_instead_of_version-highlight]] +.Change pre-configured and cached analyzer components to use IndexVersion instead of Version +[%collapsible] +==== +*Details* + +This PR changes the types used to obtain pre-configured components from Version to IndexVersion, +with corresponding changes to method names. + +Prior to 8.10, there is a one-to-one mapping between node version and index version, with corresponding constants +in the IndexVersion class. +Starting in 8.10, IndexVersion is versioned independently of node version, and will be a simple incrementing number. +For more information on how to use IndexVersion and other version types, please see the contributing guide. + +*Impact* + +Analysis components now take IndexVersion instead of Version +==== + + +[discrete] +[[deprecated-8.10]] +=== Deprecations + +The following functionality has been deprecated in {es} 8.10 +and will be removed in a future version. +While this won't have an immediate impact on your applications, +we strongly encourage you to take the described steps to update your code +after upgrading to 8.10. + +To find out if you are using any deprecated functionality, +enable <>. + +[discrete] +[[deprecations_810_authorization]] +==== Authorization deprecations + +[[mark_apm_user_for_removal_in_future_major_release]] +.Mark `apm_user` for removal in a future major release +[%collapsible] +==== +*Details* + +The `apm_user` role has been deprecated and will be removed in a future major release. Users should migrate to `editor` and `viewer` roles + +*Impact* + +Users will have to migrate to `editor` and `viewer` roles +==== + diff --git a/docs/reference/release-notes/8.10.0.asciidoc b/docs/reference/release-notes/8.10.0.asciidoc index cc24df451ed94..ea0af6c485f25 100644 --- a/docs/reference/release-notes/8.10.0.asciidoc +++ b/docs/reference/release-notes/8.10.0.asciidoc @@ -1,8 +1,255 @@ [[release-notes-8.10.0]] == {es} version 8.10.0 -coming[8.10.0] - Also see <>. +[[breaking-8.10.0]] +[float] +=== Breaking changes + +Analysis:: +* Change pre-configured and cached analyzer components to use IndexVersion instead of Version {es-pull}97319[#97319] + +Geo:: +* Remove the unused executor builder for vector tile plugin {es-pull}96577[#96577] + +[[bug-8.10.0]] +[float] +=== Bug fixes + +Aggregations:: +* Cardinality nested in time series doc values bug {es-pull}99007[#99007] +* Skip segment for `MatchNoDocsQuery` filters {es-pull}98295[#98295] (issue: {es-issue}94637[#94637]) + +Allocation:: +* Do not assign ignored shards {es-pull}98265[#98265] +* Remove exception wrapping in `BatchedRerouteService` {es-pull}97224[#97224] + +Application:: +* [Profiling] Abort index creation on outdated index {es-pull}98864[#98864] +* [Profiling] Consider static settings in status {es-pull}97890[#97890] +* [Profiling] Mark executables without a name {es-pull}98884[#98884] + +CRUD:: +* Add missing sync on `indicesThatCannotBeCreated` {es-pull}97869[#97869] + +Cluster Coordination:: +* Fix cluster bootstrap warning for single-node discovery {es-pull}96895[#96895] (issue: {es-issue}96874[#96874]) +* Fix election scheduling after discovery outage {es-pull}98420[#98420] +* Improve reliability of elections with message delays {es-pull}98354[#98354] (issue: {es-issue}97909[#97909]) +* Make `TransportAddVotingConfigExclusionsAction` retryable {es-pull}98386[#98386] +* Release master service task on timeout {es-pull}97711[#97711] + +Data streams:: +* Avoid lifecycle NPE in the data stream lifecycle usage API {es-pull}98260[#98260] + +Distributed:: +* Avoid `transport_worker` thread in `TransportBroadcastAction` {es-pull}98001[#98001] +* Avoid `transport_worker` thread in `TransportBroadcastByNodeAction` {es-pull}97920[#97920] (issue: {es-issue}97914[#97914]) +* Fork response reading in `TransportNodesAction` {es-pull}97899[#97899] + +Downsampling:: +* Copy "index.lifecycle.name" for ILM managed indices {es-pull}97110[#97110] (issue: {es-issue}96732[#96732]) +* Downsampling: copy the `_tier_preference` setting {es-pull}96982[#96982] (issue: {es-issue}96733[#96733]) + +EQL:: +* Fix async missing events {es-pull}97718[#97718] (issue: {es-issue}97644[#97644]) + +Geo:: +* Fix how Maps#flatten handle map values inside a list {es-pull}98828[#98828] +* Fix mvt error when returning partial results {es-pull}98765[#98765] (issue: {es-issue}98730[#98730]) + +Health:: +* `_health_report` SLM indicator should use the policy ID (not the name) {es-pull}99111[#99111] + +Indices APIs:: +* Ensure frozen indices have correct tier preference {es-pull}97967[#97967] + +Infra/REST API:: +* Fix possible NPE when transportversion is null in `MainResponse` {es-pull}97203[#97203] + +Ingest Node:: +* Revert "Add mappings for enrich fields" {es-pull}98683[#98683] + +Machine Learning:: +* Avoid risk of OOM in datafeeds when memory is constrained {es-pull}98324[#98324] (issue: {es-issue}89769[#89769]) +* Detect infinite loop in the WordPiece tokenizer {es-pull}98206[#98206] +* Fix to stop aggregatable subobjects from being considered multi-fields, to support `"subobjects": false` in data frame analytics {es-pull}97705[#97705] (issue: {es-issue}88605[#88605]) +* Fix weird `change_point` bug where all data values are equivalent {es-pull}97588[#97588] +* The model loading service should not notify listeners in a sync block {es-pull}97142[#97142] + +Mapping:: +* Fix `fields` API with `subobjects: false` {es-pull}97092[#97092] (issue: {es-issue}96700[#96700]) + +Network:: +* Fork remote-cluster response handling {es-pull}97922[#97922] + +Search:: +* Fork CCS remote-cluster responses {es-pull}98124[#98124] (issue: {es-issue}97997[#97997]) +* Fork CCS search-shards handling {es-pull}98209[#98209] +* Improve test coverage for CCS search cancellation and fix response bugs {es-pull}97029[#97029] +* Make `terminate_after` early termination friendly {es-pull}97540[#97540] (issue: {es-issue}97269[#97269]) +* Track `max_score` in collapse when requested {es-pull}97703[#97703] (issue: {es-issue}97653[#97653]) + +Security:: +* Fix NPE when `GetUser` with profile uid before profile index exists {es-pull}98961[#98961] + +Snapshot/Restore:: +* Fix `BlobCacheBufferedIndexInput` large read after clone {es-pull}98970[#98970] + +TSDB:: +* Mapped field types searchable with doc values {es-pull}97724[#97724] + +Transform:: +* Fix transform incorrectly calculating date bucket on updating old data {es-pull}97401[#97401] (issue: {es-issue}97101[#97101]) + +Watcher:: +* Changing watcher to disable cookies in shared http client {es-pull}97591[#97591] + +[[deprecation-8.10.0]] +[float] +=== Deprecations + +Authorization:: +* Mark `apm_user` for removal in a future major release {es-pull}87674[#87674] + +[[enhancement-8.10.0]] +[float] +=== Enhancements + +Aggregations:: +* Improve error message when aggregation doesn't support counter field {es-pull}93545[#93545] +* Set default index mode for `TimeSeries` to `null` {es-pull}98808[#98808] (issue: {es-issue}97429[#97429]) + +Allocation:: +* Add `node.roles` to cat allocation API {es-pull}96994[#96994] + +Application:: +* [Profiling] Add initial support for upgrades {es-pull}97380[#97380] +* [Profiling] Support index migrations {es-pull}97773[#97773] + +Authentication:: +* Avoid double get {es-pull}98067[#98067] (issue: {es-issue}97928[#97928]) +* Give all acces to .slo-observability.* indice to kibana user {es-pull}97539[#97539] +* Refresh tokens without search {es-pull}97395[#97395] + +Authorization:: +* Add "operator" field to authenticate response {es-pull}97234[#97234] +* Read operator privs enabled from Env settings {es-pull}98246[#98246] +* [Fleet] Allow `kibana_system` to put datastream lifecycle {es-pull}97732[#97732] + +Data streams:: +* Install data stream template for Kibana reporting {es-pull}97765[#97765] + +Downsampling:: +* Change `MetricFieldProducer#metrics` field type from list to array {es-pull}97344[#97344] +* Improve iterating over many field producers during downsample operation {es-pull}97281[#97281] +* Run downsampling using persistent tasks {es-pull}97557[#97557] (issue: {es-issue}93582[#93582]) + +EQL:: +* EQL to use only the necessary fields in the internal `field_caps` calls {es-pull}98987[#98987] + +Engine:: +* Fix edge case for active flag for flush on idle {es-pull}97332[#97332] (issue: {es-issue}97154[#97154]) + +Health:: +* Adding special logic to the disk health check for search-only nodes {es-pull}98508[#98508] +* Health API Periodic Logging {es-pull}96772[#96772] + +ILM+SLM:: +* Separating SLM from ILM {es-pull}98184[#98184] + +Infra/Core:: +* Infrastructure to report upon document parsing {es-pull}97961[#97961] + +Infra/Node Lifecycle:: +* Check ILM status before reporting node migration STALLED {es-pull}98367[#98367] (issue: {es-issue}89486[#89486]) + +Infra/Plugins:: +* Adding `ApiFilteringActionFilter` {es-pull}97985[#97985] + +Infra/REST API:: +* Enable Serverless API protections dynamically {es-pull}97079[#97079] +* Make `RestController` pluggable {es-pull}98187[#98187] + +Infra/Settings:: +* Mark customer settings for serverless {es-pull}98051[#98051] + +Ingest Node:: +* Allow custom geo ip database files to be downloaded {es-pull}97850[#97850] + +Network:: +* Add request header size limit for RCS transport connections {es-pull}98692[#98692] + +Search:: +* Add `completion_time` time field to `async_search` get and status response {es-pull}97700[#97700] (issue: {es-issue}88640[#88640]) +* Add setting for search parallelism {es-pull}98455[#98455] +* Add support for concurrent collection when size is greater than zero {es-pull}98425[#98425] +* Cross-cluster search provides details about search on each cluster {es-pull}97731[#97731] +* Enable parallel collection in Dfs phase {es-pull}97416[#97416] +* Exclude clusters from a cross-cluster search {es-pull}97865[#97865] +* Improve MatchNoDocsQuery description {es-pull}96069[#96069] (issue: {es-issue}95741[#95741]) +* Improve exists query rewrite {es-pull}97159[#97159] +* Improve match query rewrite {es-pull}97208[#97208] +* Improve prefix query rewrite {es-pull}97209[#97209] +* Improve wildcard query and terms query rewrite {es-pull}97594[#97594] +* Introduce Synonyms Management API used for synonym and synonym_graph filters {es-pull}97962[#97962] (issue: {es-issue}38523[#38523]) +* Introduce a collector manager for `PartialHitCountCollector` {es-pull}97550[#97550] +* Introduce a collector manager for `QueryPhaseCollector` {es-pull}97410[#97410] +* Limit `_terms_enum` prefix size {es-pull}97488[#97488] (issue: {es-issue}96572[#96572]) +* Support minimum_should_match field for terms_set query {es-pull}96082[#96082] +* Support type for simple query string {es-pull}96717[#96717] +* Unwrap IOException in `ContextIndexSearcher` concurrent code-path {es-pull}98459[#98459] +* Use a collector manager in DfsPhase Knn Search {es-pull}96689[#96689] +* Use the Weight#matches mode for highlighting by default {es-pull}96068[#96068] +* Wire `QueryPhaseCollectorManager` into the query phase {es-pull}97726[#97726] +* Wire concurrent top docs collector managers when size is 0 {es-pull}97755[#97755] +* `ProfileCollectorManager` to support child profile collectors {es-pull}97387[#97387] +* cleanup some code NoriTokenizerFactory and KuromojiTokenizerFactory {es-pull}92574[#92574] + +Security:: +* Add an API for managing the settings of Security system indices {es-pull}97630[#97630] +* Support getting active-only API keys via Get API keys API {es-pull}98259[#98259] (issue: {es-issue}97995[#97995]) + +Snapshot/Restore:: +* Add Setting to optionally use mmap for shared cache IO {es-pull}97581[#97581] +* Collect additional object store stats for S3 {es-pull}98083[#98083] +* HDFS plugin add replication_factor param {es-pull}94132[#94132] + +Store:: +* Allow Lucene directory implementations to estimate their size {es-pull}97822[#97822] +* Allow `ByteSizeDirectory` to expose their data set sizes {es-pull}98085[#98085] + +TSDB:: +* Add tsdb metrics builtin component template {es-pull}97602[#97602] +* Include more downsampling status statistics {es-pull}96930[#96930] (issue: {es-issue}96760[#96760]) +* `TimeSeriesIndexSearcher` to offload to the provided executor {es-pull}98414[#98414] + +Transform:: +* Support boxplot aggregation in transform {es-pull}96515[#96515] + +[[feature-8.10.0]] +[float] +=== New features + +Application:: +* Enable Query Rules as technical preview {es-pull}97466[#97466] +* [Enterprise Search] Add connectors indices and ent-search pipeline {es-pull}97463[#97463] + +Data streams:: +* Introduce downsampling configuration for data stream lifecycle {es-pull}97041[#97041] + +Search:: +* Introduce executor for concurrent search {es-pull}98204[#98204] + +Security:: +* Beta release for API key based cross-cluster access {es-pull}98307[#98307] + +[[upgrade-8.10.0]] +[float] +=== Upgrades + +Network:: +* Upgrade Netty to 4.1.94.Final {es-pull}97040[#97040] + diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index cd221ada7a4dc..313a6dd459668 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -102,7 +102,8 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin { TimeValue.timeValueMinutes(1), TimeValue.timeValueDays(7), Setting.Property.IndexScope, - Setting.Property.Dynamic + Setting.Property.Dynamic, + Setting.Property.ServerlessPublic ); public static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle"; public static final Setting LOOK_BACK_TIME = Setting.timeSetting( @@ -111,7 +112,8 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin { TimeValue.timeValueMinutes(1), TimeValue.timeValueDays(7), Setting.Property.IndexScope, - Setting.Property.Dynamic + Setting.Property.Dynamic, + Setting.Property.ServerlessPublic ); // The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this: private final SetOnce updateTimeSeriesRangeService = new SetOnce<>(); diff --git a/qa/os/src/test/java/org/elasticsearch/packaging/util/Shell.java b/qa/os/src/test/java/org/elasticsearch/packaging/util/Shell.java index fc69d0de71a26..d06110deb9ef0 100644 --- a/qa/os/src/test/java/org/elasticsearch/packaging/util/Shell.java +++ b/qa/os/src/test/java/org/elasticsearch/packaging/util/Shell.java @@ -202,7 +202,7 @@ private Result runScriptIgnoreExitCode(String[] command) { private String readFileIfExists(Path path) throws IOException { if (Files.exists(path)) { long size = Files.size(path); - final int maxFileSize = 100 * 1024; + final int maxFileSize = 1024 * 1024; if (size > maxFileSize) { // file is really big, truncate try (var br = Files.newBufferedReader(path, StandardCharsets.UTF_8)) { diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 844931dee9eb4..f1d5bc6f02a7c 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -136,6 +136,7 @@ static TransportVersion def(int id) { public static final TransportVersion SHARD_SIZE_PRIMARY_TERM_GEN_ADDED = def(8_500_072); public static final TransportVersion COMPAT_VERSIONS_MAPPING_VERSION_ADDED = def(8_500_073); public static final TransportVersion V_8_500_074 = def(8_500_074); + public static final TransportVersion NODE_INFO_INDEX_VERSION_ADDED = def(8_500_075); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java index 35dc876b3a585..3086f22fae8bc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; import org.elasticsearch.http.HttpInfo; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.ingest.IngestInfo; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.os.OsInfo; @@ -41,6 +42,7 @@ public class NodeInfo extends BaseNodeResponse { private final Version version; private final TransportVersion transportVersion; + private final IndexVersion indexVersion; private final Build build; @Nullable @@ -64,6 +66,11 @@ public NodeInfo(StreamInput in) throws IOException { } else { transportVersion = TransportVersion.fromId(version.id); } + if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_INFO_INDEX_VERSION_ADDED)) { + indexVersion = IndexVersion.readVersion(in); + } else { + indexVersion = IndexVersion.fromId(version.id); + } build = Build.readBuild(in); if (in.readBoolean()) { totalIndexingBuffer = ByteSizeValue.ofBytes(in.readLong()); @@ -94,6 +101,7 @@ public NodeInfo(StreamInput in) throws IOException { public NodeInfo( Version version, TransportVersion transportVersion, + IndexVersion indexVersion, Build build, DiscoveryNode node, @Nullable Settings settings, @@ -112,6 +120,7 @@ public NodeInfo( super(node); this.version = version; this.transportVersion = transportVersion; + this.indexVersion = indexVersion; this.build = build; this.settings = settings; addInfoIfNonNull(OsInfo.class, os); @@ -149,6 +158,13 @@ public TransportVersion getTransportVersion() { return transportVersion; } + /** + * The most recent index version that can be used by this node + */ + public IndexVersion getIndexVersion() { + return indexVersion; + } + /** * The build version of the node. */ @@ -200,6 +216,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) { TransportVersion.writeVersion(transportVersion, out); } + if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_INFO_INDEX_VERSION_ADDED)) { + IndexVersion.writeVersion(indexVersion, out); + } Build.writeBuild(build, out); if (totalIndexingBuffer == null) { out.writeBoolean(false); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java index 2b97ee38daa9f..68769af5a17d9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java @@ -65,6 +65,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("version", nodeInfo.getVersion()); builder.field("transport_version", nodeInfo.getTransportVersion().id()); + builder.field("index_version", nodeInfo.getIndexVersion().id()); builder.field("build_flavor", nodeInfo.getBuild().flavor()); builder.field("build_type", nodeInfo.getBuild().type().displayName()); builder.field("build_hash", nodeInfo.getBuild().hash()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index ab67478192c11..9709e149b28d1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -493,7 +493,8 @@ public Iterator> settings() { public static final Setting> INDEX_ROUTING_PATH = Setting.stringListSetting( "index.routing_path", Setting.Property.IndexScope, - Setting.Property.Final + Setting.Property.Final, + Property.ServerlessPublic ); /** diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index bb4bd0062b8db..7cd6fa471040a 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -521,12 +521,12 @@ public void invalidateAll() { Entry h; boolean[] haveSegmentLock = new boolean[NUMBER_OF_SEGMENTS]; - try { - for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) { - segments[i].segmentLock.writeLock().lock(); - haveSegmentLock[i] = true; - } - try (ReleasableLock ignored = lruLock.acquire()) { + try (ReleasableLock ignored = lruLock.acquire()) { + try { + for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) { + segments[i].segmentLock.writeLock().lock(); + haveSegmentLock[i] = true; + } h = head; for (CacheSegment segment : segments) { segment.map = null; @@ -539,11 +539,11 @@ public void invalidateAll() { head = tail = null; count = 0; weight = 0; - } - } finally { - for (int i = NUMBER_OF_SEGMENTS - 1; i >= 0; i--) { - if (haveSegmentLock[i]) { - segments[i].segmentLock.writeLock().unlock(); + } finally { + for (int i = NUMBER_OF_SEGMENTS - 1; i >= 0; i--) { + if (haveSegmentLock[i]) { + segments[i].segmentLock.writeLock().unlock(); + } } } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 3a2b01f5cc9c4..d887ed8d1531d 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -590,7 +590,8 @@ public Iterator> settings() { Instant.ofEpochMilli(DateUtils.MAX_MILLIS_BEFORE_MINUS_9999), v -> {}, Property.IndexScope, - Property.Final + Property.Final, + Property.ServerlessPublic ); /** @@ -619,7 +620,8 @@ public Iterator> settings() { } }, Property.IndexScope, - Property.Dynamic + Property.Dynamic, + Property.ServerlessPublic ); public static final Setting TIME_SERIES_ES87TSDB_CODEC_ENABLED_SETTING = Setting.boolSetting( @@ -658,7 +660,8 @@ public Iterator> settings() { } }, Property.IndexScope, - Property.Final + Property.Final, + Property.ServerlessPublic ); /** diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index 1bb039ecfbe26..f71fada3c046a 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -22,6 +22,7 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -114,6 +115,7 @@ public NodeInfo info( return new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), Build.current(), transportService.getLocalNode(), settings ? settingsFilter.filter(this.settings) : null, diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java index 1aa2eb16c0309..35f17c24b3e89 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java @@ -38,6 +38,7 @@ public void testGetInfo() { NodeInfo nodeInfo = new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), Build.current(), DiscoveryNodeUtils.builder("test_node") .roles(emptySet()) diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesActionTests.java index c75d79f1ea18e..863a57a42c2bc 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesActionTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -78,6 +79,7 @@ public void testDoExecuteForRemoteServerNodes() { new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), null, node, null, @@ -147,6 +149,7 @@ public void testDoExecuteForRemoteNodes() { new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), null, node, null, diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 060683818b9a7..80c320493f4ac 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.stats.IndexingPressureStats; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.test.ESTestCase; @@ -323,6 +324,7 @@ private static NodeInfo createNodeInfo(String nodeId, String transportType, Stri return new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), Build.current(), DiscoveryNodeUtils.create(nodeId, buildNewFakeTransportAddress()), settings.build(), diff --git a/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java b/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java index 520d60a4dfbf6..05b35f992564f 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.ingest.FakeProcessor; import org.elasticsearch.ingest.IngestInfo; import org.elasticsearch.ingest.IngestService; @@ -101,6 +102,7 @@ public void setup() { NodeInfo nodeInfo = new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), Build.current(), discoveryNode, Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java index f213d7e366ce4..3c8540c7771c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java @@ -86,6 +86,7 @@ private static NodesInfoResponse getResponse(Map respo null, e.getValue(), null, + null, DiscoveryNodeUtils.create(e.getKey(), new TransportAddress(TransportAddress.META_ADDRESS, 9200)), null, null, diff --git a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java index f48cf67bf5466..8a338333065d7 100644 --- a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java @@ -73,6 +73,8 @@ private void assertExpectedUnchanged(NodeInfo nodeInfo, NodeInfo readNodeInfo) t assertThat(nodeInfo.getBuild().toString(), equalTo(readNodeInfo.getBuild().toString())); assertThat(nodeInfo.getHostname(), equalTo(readNodeInfo.getHostname())); assertThat(nodeInfo.getVersion(), equalTo(readNodeInfo.getVersion())); + assertThat(nodeInfo.getTransportVersion(), equalTo(readNodeInfo.getTransportVersion())); + assertThat(nodeInfo.getIndexVersion(), equalTo(readNodeInfo.getIndexVersion())); compareJsonOutput(nodeInfo.getInfo(HttpInfo.class), readNodeInfo.getInfo(HttpInfo.class)); compareJsonOutput(nodeInfo.getInfo(RemoteClusterServerInfo.class), readNodeInfo.getInfo(RemoteClusterServerInfo.class)); compareJsonOutput(nodeInfo.getInfo(JvmInfo.class), readNodeInfo.getInfo(JvmInfo.class)); @@ -230,6 +232,7 @@ private static NodeInfo createNodeInfo() { return new NodeInfo( VersionUtils.randomVersion(random()), TransportVersionUtils.randomVersion(random()), + IndexVersionUtils.randomVersion(random()), build, node, settings, diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestPluginsActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestPluginsActionTests.java index 5d26036769a35..5249fe6aec9f0 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestPluginsActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestPluginsActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Table; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.plugins.PluginDescriptor; import org.elasticsearch.plugins.PluginRuntimeInfo; import org.elasticsearch.rest.RestRequest; @@ -64,6 +65,7 @@ private Table buildTable(List pluginDescriptor) { new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), null, node(i), null, diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java index c0c7de657eafe..673b05c7c2466 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.test.client.NoOpClient; @@ -450,6 +451,7 @@ private static org.elasticsearch.action.admin.cluster.node.info.NodeInfo infoFor return new org.elasticsearch.action.admin.cluster.node.info.NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), Build.current(), node, null, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index 78a89877cbfef..543360fc24d89 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -7,14 +7,12 @@ package org.elasticsearch.xpack.core.ml.job.config; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.AbstractXContentSerializingTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.MlConfigVersion; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -294,7 +292,7 @@ public void testMergeWithJob() { updateBuilder.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, randomBoolean())); updateBuilder.setCustomSettings(customSettings); updateBuilder.setModelSnapshotId(randomAlphaOfLength(10)); - updateBuilder.setJobVersion(MlConfigVersion.fromVersion(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT))); + updateBuilder.setJobVersion(MlConfigVersionUtils.randomCompatibleVersion(random())); updateBuilder.setModelPruneWindow(TimeValue.timeValueDays(randomIntBetween(1, 100))); JobUpdate update = updateBuilder.build(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index e0b25469d5ab2..ec1e13d033a8b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -152,11 +152,14 @@ SearchContext searchContext() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.getClass().getSimpleName()).append("["); - sb.append(", maxPageSize=").append(maxPageSize); + sb.append("maxPageSize=").append(maxPageSize); + describe(sb); sb.append("]"); return sb.toString(); } + protected abstract void describe(StringBuilder sb); + @Override public Operator.Status status() { return new Status(this); @@ -234,11 +237,11 @@ public int pagesEmitted() { return pagesEmitted; } - public int leafPosition() { + public int slicePosition() { return slicePosition; } - public int leafSize() { + public int sliceSize() { return sliceSize; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 540cee388efc9..0bbb6571dc4fd 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -160,4 +160,9 @@ public Page getOutput() { throw new UncheckedIOException(e); } } + + @Override + protected void describe(StringBuilder sb) { + sb.append(", remainingDocs=").append(remainingDocs); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 48389d31e08be..4c6bb50ce9f7f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -212,6 +212,12 @@ private Page emit(boolean startEmitting) { ); } + @Override + protected void describe(StringBuilder sb) { + sb.append(", limit=").append(limit); + sb.append(", sorts=").append(sorts); + } + static final class PerShardCollector { private final int shardIndex; private final SearchContext searchContext; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 4808094e116be..5f5cbe2707fe3 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -40,15 +40,25 @@ public class Driver implements Releasable, Describable { public static final TimeValue DEFAULT_TIME_BEFORE_YIELDING = TimeValue.timeValueMinutes(5); public static final int DEFAULT_MAX_ITERATIONS = 10_000; + /** + * Minimum time between updating status. + */ + public static final TimeValue DEFAULT_STATUS_INTERVAL = TimeValue.timeValueSeconds(1); private final String sessionId; private final DriverContext driverContext; private final Supplier description; private final List activeOperators; private final Releasable releasable; + private final long statusNanos; private final AtomicReference cancelReason = new AtomicReference<>(); private final AtomicReference> blocked = new AtomicReference<>(); + /** + * Status reported to the tasks API. We write the status at most once every + * {@link #statusNanos}, as soon as loop has finished and after {@link #statusNanos} + * have passed. + */ private final AtomicReference status; /** @@ -58,6 +68,7 @@ public class Driver implements Releasable, Describable { * @param source source operator * @param intermediateOperators the chain of operators to execute * @param sink sink operator + * @param statusInterval minimum status reporting interval * @param releasable a {@link Releasable} to invoked once the chain of operators has run to completion */ public Driver( @@ -67,6 +78,7 @@ public Driver( SourceOperator source, List intermediateOperators, SinkOperator sink, + TimeValue statusInterval, Releasable releasable ) { this.sessionId = sessionId; @@ -76,6 +88,7 @@ public Driver( this.activeOperators.add(source); this.activeOperators.addAll(intermediateOperators); this.activeOperators.add(sink); + this.statusNanos = statusInterval.nanos(); this.releasable = releasable; this.status = new AtomicReference<>(new DriverStatus(sessionId, System.currentTimeMillis(), DriverStatus.Status.QUEUED, List.of())); } @@ -95,7 +108,7 @@ public Driver( SinkOperator sink, Releasable releasable ) { - this("unset", driverContext, () -> null, source, intermediateOperators, sink, releasable); + this("unset", driverContext, () -> null, source, intermediateOperators, sink, DEFAULT_STATUS_INTERVAL, releasable); } public DriverContext driverContext() { @@ -110,26 +123,33 @@ public DriverContext driverContext() { private ListenableActionFuture run(TimeValue maxTime, int maxIterations) { long maxTimeNanos = maxTime.nanos(); long startTime = System.nanoTime(); + long nextStatus = startTime + statusNanos; int iter = 0; while (isFinished() == false) { ListenableActionFuture fut = runSingleLoopIteration(); if (fut.isDone() == false) { + status.set(updateStatus(DriverStatus.Status.ASYNC)); return fut; } - if (++iter >= maxIterations) { + if (iter >= maxIterations) { break; } long now = System.nanoTime(); + if (now > nextStatus) { + status.set(updateStatus(DriverStatus.Status.RUNNING)); + nextStatus = now + statusNanos; + } + iter++; if (now - startTime > maxTimeNanos) { break; } } if (isFinished()) { - status.set(updateStatus(DriverStatus.Status.DONE)); // Report status for the tasks API + status.set(updateStatus(DriverStatus.Status.DONE)); driverContext.finish(); releasable.close(); } else { - status.set(updateStatus(DriverStatus.Status.RUNNING)); // Report status for the tasks API + status.set(updateStatus(DriverStatus.Status.WAITING)); } return Operator.NOT_BLOCKED; } @@ -227,7 +247,7 @@ private void ensureNotCancelled() { } public static void start(Executor executor, Driver driver, int maxIterations, ActionListener listener) { - driver.status.set(driver.updateStatus(DriverStatus.Status.STARTING)); // Report status for the tasks API + driver.status.set(driver.updateStatus(DriverStatus.Status.STARTING)); schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, executor, driver, listener); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java index 1a33bbbb9ff3a..b3326e395def2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java @@ -210,6 +210,8 @@ public enum Status implements ToXContentFragment { QUEUED, STARTING, RUNNING, + ASYNC, + WAITING, DONE; @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java index be6a4a3cd19fb..60d5dd394afb7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java @@ -51,36 +51,36 @@ protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status randomValueOtherThan(instance.currentLeaf(), ESTestCase::randomNonNegativeInt), instance.totalLeaves(), instance.pagesEmitted(), - instance.leafPosition(), - instance.leafSize() + instance.slicePosition(), + instance.sliceSize() ); case 1 -> new LuceneSourceOperator.Status( instance.currentLeaf(), randomValueOtherThan(instance.totalLeaves(), ESTestCase::randomNonNegativeInt), instance.pagesEmitted(), - instance.leafPosition(), - instance.leafSize() + instance.slicePosition(), + instance.sliceSize() ); case 2 -> new LuceneSourceOperator.Status( instance.currentLeaf(), instance.totalLeaves(), randomValueOtherThan(instance.pagesEmitted(), ESTestCase::randomNonNegativeInt), - instance.leafPosition(), - instance.leafSize() + instance.slicePosition(), + instance.sliceSize() ); case 3 -> new LuceneSourceOperator.Status( instance.currentLeaf(), instance.totalLeaves(), instance.pagesEmitted(), - randomValueOtherThan(instance.leafPosition(), ESTestCase::randomNonNegativeInt), - instance.leafSize() + randomValueOtherThan(instance.slicePosition(), ESTestCase::randomNonNegativeInt), + instance.sliceSize() ); case 4 -> new LuceneSourceOperator.Status( instance.currentLeaf(), instance.totalLeaves(), instance.pagesEmitted(), - instance.leafPosition(), - randomValueOtherThan(instance.leafSize(), ESTestCase::randomNonNegativeInt) + instance.slicePosition(), + randomValueOtherThan(instance.sliceSize(), ESTestCase::randomNonNegativeInt) ); default -> throw new UnsupportedOperationException(); }; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index c0fc8b26bddf3..8edd2c7c83f64 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -171,6 +171,7 @@ public static void runDriver(List drivers) { new SequenceLongBlockSourceOperator(LongStream.range(0, between(1, 100)), between(1, 100)), List.of(), new PageConsumerOperator(page -> {}), + Driver.DEFAULT_STATUS_INTERVAL, () -> {} ) ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 40187abb3dae1..af6c89395f245 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -271,14 +271,32 @@ void runConcurrentTest( String description = "sink-" + i; ExchangeSinkOperator sinkOperator = new ExchangeSinkOperator(exchangeSink.get(), Function.identity()); DriverContext dc = driverContext(); - Driver d = new Driver("test-session:1", dc, () -> description, seqNoGenerator.get(dc), List.of(), sinkOperator, () -> {}); + Driver d = new Driver( + "test-session:1", + dc, + () -> description, + seqNoGenerator.get(dc), + List.of(), + sinkOperator, + Driver.DEFAULT_STATUS_INTERVAL, + () -> {} + ); drivers.add(d); } for (int i = 0; i < numSources; i++) { String description = "source-" + i; ExchangeSourceOperator sourceOperator = new ExchangeSourceOperator(exchangeSource.get()); DriverContext dc = driverContext(); - Driver d = new Driver("test-session:2", dc, () -> description, sourceOperator, List.of(), seqNoCollector.get(dc), () -> {}); + Driver d = new Driver( + "test-session:2", + dc, + () -> description, + sourceOperator, + List.of(), + seqNoCollector.get(dc), + Driver.DEFAULT_STATUS_INTERVAL, + () -> {} + ); drivers.add(d); } PlainActionFuture future = new PlainActionFuture<>(); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index a5931b15f76a9..cae86e44c69d0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.compute.lucene.LuceneSourceOperator; @@ -112,54 +113,59 @@ public void setupIndex() throws IOException { bulk.get(); } - @AwaitsFix(bugUrl = "the task status is only updated after max_iterations") public void testTaskContents() throws Exception { ActionFuture response = startEsql(); - getTasksStarting(); - List foundTasks = getTasksRunning(); - int luceneSources = 0; - int valuesSourceReaders = 0; - int exchangeSources = 0; - int exchangeSinks = 0; - for (TaskInfo task : foundTasks) { - DriverStatus status = (DriverStatus) task.status(); - assertThat(status.sessionId(), not(emptyOrNullString())); - for (DriverStatus.OperatorStatus o : status.activeOperators()) { - if (o.operator().equals("LuceneSourceOperator[shardId=0, maxPageSize=" + PAGE_SIZE + "]")) { - LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status(); - assertThat(oStatus.currentLeaf(), lessThanOrEqualTo(oStatus.totalLeaves())); - assertThat(oStatus.leafPosition(), lessThanOrEqualTo(oStatus.leafSize())); - luceneSources++; - continue; - } - if (o.operator().equals("ValuesSourceReaderOperator[field = pause_me]")) { - ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status(); - assertThat(oStatus.readersBuilt(), equalTo(Map.of("LongValuesReader", 1))); - assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1)); - valuesSourceReaders++; - continue; - } - if (o.operator().equals("ExchangeSourceOperator")) { - ExchangeSourceOperator.Status oStatus = (ExchangeSourceOperator.Status) o.status(); - assertThat(oStatus.pagesWaiting(), greaterThanOrEqualTo(0)); - assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(0)); - exchangeSources++; - continue; - } - if (o.operator().equals("ExchangeSinkOperator")) { - ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status(); - assertThat(oStatus.pagesAccepted(), greaterThanOrEqualTo(0)); - exchangeSinks++; + try { + getTasksStarting(); + scriptPermits.release(PAGE_SIZE); + List foundTasks = getTasksRunning(); + int luceneSources = 0; + int valuesSourceReaders = 0; + int exchangeSources = 0; + int exchangeSinks = 0; + for (TaskInfo task : foundTasks) { + DriverStatus status = (DriverStatus) task.status(); + assertThat(status.sessionId(), not(emptyOrNullString())); + for (DriverStatus.OperatorStatus o : status.activeOperators()) { + if (o.operator().startsWith("LuceneSourceOperator[maxPageSize=" + PAGE_SIZE)) { + LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status(); + assertThat(oStatus.currentLeaf(), lessThanOrEqualTo(oStatus.totalLeaves())); + assertThat(oStatus.slicePosition(), greaterThanOrEqualTo(0)); + if (oStatus.sliceSize() != 0) { + assertThat(oStatus.slicePosition(), lessThanOrEqualTo(oStatus.sliceSize())); + } + luceneSources++; + continue; + } + if (o.operator().equals("ValuesSourceReaderOperator[field = pause_me]")) { + ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status(); + assertThat(oStatus.readersBuilt(), equalTo(Map.of("LongValuesReader", 1))); + assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1)); + valuesSourceReaders++; + continue; + } + if (o.operator().equals("ExchangeSourceOperator")) { + ExchangeSourceOperator.Status oStatus = (ExchangeSourceOperator.Status) o.status(); + assertThat(oStatus.pagesWaiting(), greaterThanOrEqualTo(0)); + assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(0)); + exchangeSources++; + continue; + } + if (o.operator().equals("ExchangeSinkOperator")) { + ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status(); + assertThat(oStatus.pagesAccepted(), greaterThanOrEqualTo(0)); + exchangeSinks++; + } } } + assertThat(luceneSources, greaterThanOrEqualTo(1)); + assertThat(valuesSourceReaders, equalTo(1)); + assertThat(exchangeSinks, greaterThanOrEqualTo(1)); + assertThat(exchangeSources, equalTo(1)); + } finally { + scriptPermits.release(Integer.MAX_VALUE); + assertThat(Iterators.flatMap(response.get().values(), i -> i).next(), equalTo((long) NUM_DOCS)); } - assertThat(luceneSources, greaterThanOrEqualTo(1)); - assertThat(valuesSourceReaders, equalTo(1)); - assertThat(exchangeSinks, greaterThanOrEqualTo(1)); - assertThat(exchangeSources, equalTo(1)); - - scriptPermits.release(Integer.MAX_VALUE); - assertThat(response.get().values(), equalTo(List.of(List.of((long) NUM_DOCS)))); } public void testCancelRead() throws Exception { @@ -194,8 +200,17 @@ public void testCancelEsqlTask() throws Exception { private ActionFuture startEsql() { scriptPermits.drainPermits(); - scriptPermits.release(between(1, 10)); - var pragmas = new QueryPragmas(Settings.builder().put("data_partitioning", "shard").put("page_size", PAGE_SIZE).build()); + scriptPermits.release(between(1, 5)); + var pragmas = new QueryPragmas( + Settings.builder() + // Force shard partitioning because that's all the tests know how to match. It is easier to reason about too. + .put("data_partitioning", "shard") + // Limit the page size to something small so we do more than one page worth of work, so we get more status updates. + .put("page_size", PAGE_SIZE) + // Report the status after every action + .put("status_interval", "0ms") + .build() + ); return new EsqlQueryRequestBuilder(client(), EsqlQueryAction.INSTANCE).query("from test | stats sum(pause_me)") .pragmas(pragmas) .execute(); @@ -233,7 +248,13 @@ private List getTasksStarting() throws Exception { assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION))); DriverStatus status = (DriverStatus) task.status(); logger.info("{}", status.status()); - assertThat(status.status(), equalTo(DriverStatus.Status.STARTING)); + /* + * Accept tasks that are either starting or have gone + * immediately async. The coordinating task is likely + * to have done the latter and the reading task should + * have done the former. + */ + assertThat(status.status(), either(equalTo(DriverStatus.Status.STARTING)).or(equalTo(DriverStatus.Status.ASYNC))); } foundTasks.addAll(tasks); }); @@ -256,10 +277,13 @@ private List getTasksRunning() throws Exception { assertThat(tasks, hasSize(equalTo(2))); for (TaskInfo task : tasks) { assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME)); - assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION))); DriverStatus status = (DriverStatus) task.status(); - // TODO: Running is not after one iteration? - assertThat(status.status(), equalTo(DriverStatus.Status.STARTING)); + assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION))); + if (task.description().equals(READ_DESCRIPTION)) { + assertThat(status.status(), equalTo(DriverStatus.Status.RUNNING)); + } else { + assertThat(status.status(), equalTo(DriverStatus.Status.ASYNC)); + } } foundTasks.addAll(tasks); }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 42c705c8d9a76..df7058e28fb43 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -213,6 +213,7 @@ private void doLookup( queryOperator, intermediateOperators, outputOperator, + Driver.DEFAULT_STATUS_INTERVAL, searchContext ); task.addListener(() -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index cd653c64213c3..d5b8b6df1db8c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -41,6 +41,7 @@ import org.elasticsearch.compute.operator.topn.TopNOperator; import org.elasticsearch.compute.operator.topn.TopNOperator.TopNOperatorFactory; import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; @@ -149,7 +150,10 @@ public LocalExecutionPlan plan(PhysicalPlan node) { PhysicalOperation physicalOperation = plan(node, context); context.addDriverFactory( - new DriverFactory(new DriverSupplier(context.bigArrays, physicalOperation), context.driverParallelism().get()) + new DriverFactory( + new DriverSupplier(context.bigArrays, physicalOperation, configuration.pragmas().statusInterval()), + context.driverParallelism().get() + ) ); return new LocalExecutionPlan(context.driverFactories); @@ -679,8 +683,10 @@ int pageSize(Integer estimatedRowSize) { } } - record DriverSupplier(BigArrays bigArrays, PhysicalOperation physicalOperation) implements Function, Describable { - + record DriverSupplier(BigArrays bigArrays, PhysicalOperation physicalOperation, TimeValue statusInterval) + implements + Function, + Describable { @Override public Driver apply(String sessionId) { SourceOperator source = null; @@ -693,7 +699,7 @@ public Driver apply(String sessionId) { physicalOperation.operators(operators, driverContext); sink = physicalOperation.sink(driverContext); success = true; - return new Driver(sessionId, driverContext, physicalOperation::describe, source, operators, sink, () -> {}); + return new Driver(sessionId, driverContext, physicalOperation::describe, source, operators, sink, statusInterval, () -> {}); } finally { if (false == success) { Releasables.close(source, () -> Releasables.close(operators), sink); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index bb1f669dc2b43..602e04ff08f6c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -14,6 +14,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.operator.Driver; +import org.elasticsearch.compute.operator.DriverStatus; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -42,6 +45,12 @@ public final class QueryPragmas implements Writeable { */ public static final Setting PAGE_SIZE = Setting.intSetting("page_size", 0, 0); + /** + * The minimum interval between syncs of the {@link DriverStatus}, making + * the status available to task API. + */ + public static final Setting STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL); + public static final QueryPragmas EMPTY = new QueryPragmas(Settings.EMPTY); private final Settings settings; @@ -87,6 +96,14 @@ public int pageSize() { return PAGE_SIZE.get(settings); } + /** + * The minimum interval between syncs of the {@link DriverStatus}, making + * the status available to task API. + */ + public TimeValue statusInterval() { + return STATUS_INTERVAL.get(settings); + } + public boolean isEmpty() { return settings.isEmpty(); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/enrollment/TransportNodeEnrollmentActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/enrollment/TransportNodeEnrollmentActionTests.java index 6e31d55c81d29..6144ae74c7692 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/enrollment/TransportNodeEnrollmentActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/enrollment/TransportNodeEnrollmentActionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -103,6 +104,7 @@ public void testDoExecute() throws Exception { new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), null, n, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/enrollment/InternalEnrollmentTokenGeneratorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/enrollment/InternalEnrollmentTokenGeneratorTests.java index 448265f7efcf4..02b088e245120 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/enrollment/InternalEnrollmentTokenGeneratorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/enrollment/InternalEnrollmentTokenGeneratorTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.Environment; import org.elasticsearch.http.HttpInfo; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.FixedExecutorBuilder; @@ -231,6 +232,7 @@ public Answer answerNullHttpInfo(InvocationOnMock invocationO new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), null, DiscoveryNodeUtils.builder("1").name("node-name").roles(Set.of()).build(), null, @@ -264,6 +266,7 @@ private Answer answerWithInfo(InvocationOnMock invocationOnMo new NodeInfo( Version.CURRENT, TransportVersion.current(), + IndexVersion.current(), null, DiscoveryNodeUtils.builder("1").name("node-name").roles(Set.of()).build(), null,