diff --git a/build-tools-internal/gradle/wrapper/gradle-wrapper.properties b/build-tools-internal/gradle/wrapper/gradle-wrapper.properties index 6c7fa4d4653d2..01f330a93e8fa 100644 --- a/build-tools-internal/gradle/wrapper/gradle-wrapper.properties +++ b/build-tools-internal/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionSha256Sum=bb09982fdf52718e4c7b25023d10df6d35a5fff969860bdf5a5bd27a3ab27a9e -distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-all.zip +distributionSha256Sum=f2b9ed0faf8472cbe469255ae6c86eddb77076c75191741b4a462f33128dd419 +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-all.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/build-tools-internal/src/main/resources/minimumGradleVersion b/build-tools-internal/src/main/resources/minimumGradleVersion index 223a939307878..fad03000495ca 100644 --- a/build-tools-internal/src/main/resources/minimumGradleVersion +++ b/build-tools-internal/src/main/resources/minimumGradleVersion @@ -1 +1 @@ -8.3 \ No newline at end of file +8.4 \ No newline at end of file diff --git a/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/cli/InstallPluginAction.java b/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/cli/InstallPluginAction.java index d32cbd8dd1736..c7bee4a6c172d 100644 --- a/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/cli/InstallPluginAction.java +++ b/distribution/tools/plugin-cli/src/main/java/org/elasticsearch/plugins/cli/InstallPluginAction.java @@ -23,7 +23,6 @@ import org.bouncycastle.openpgp.operator.jcajce.JcaKeyFingerprintCalculator; import org.bouncycastle.openpgp.operator.jcajce.JcaPGPContentVerifierBuilderProvider; import org.elasticsearch.Build; -import org.elasticsearch.Version; import org.elasticsearch.bootstrap.PluginPolicyInfo; import org.elasticsearch.bootstrap.PolicyUtil; import org.elasticsearch.cli.ExitCodes; @@ -84,6 +83,8 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.ZipEntry; @@ -303,7 +304,7 @@ private Path download(InstallablePlugin plugin, Path tmpDir) throws Exception { // else carry on to regular download } - final String url = getElasticUrl(getStagingHash(), Version.CURRENT, isSnapshot(), pluginId, Platforms.PLATFORM_NAME); + final String url = getElasticUrl(getStagingHash(), isSnapshot(), pluginId, Platforms.PLATFORM_NAME); terminal.println(logPrefix + "Downloading " + pluginId + " from elastic"); return downloadAndValidate(url, tmpDir, true); } @@ -341,7 +342,7 @@ private Path getPluginArchivePath(String pluginId, String pluginArchiveDir) thro if (Files.isDirectory(path) == false) { throw new UserException(ExitCodes.CONFIG, "Location in ES_PLUGIN_ARCHIVE_DIR is not a directory"); } - return PathUtils.get(pluginArchiveDir, pluginId + "-" + Version.CURRENT + (isSnapshot() ? "-SNAPSHOT" : "") + ".zip"); + return PathUtils.get(pluginArchiveDir, pluginId + "-" + Build.current().qualifiedVersion() + ".zip"); } // pkg private so tests can override @@ -356,13 +357,8 @@ boolean isSnapshot() { /** * Returns the url for an official elasticsearch plugin. */ - private String getElasticUrl( - final String stagingHash, - final Version version, - final boolean isSnapshot, - final String pluginId, - final String platform - ) throws IOException, UserException { + private String getElasticUrl(final String stagingHash, final boolean isSnapshot, final String pluginId, final String platform) + throws IOException, UserException { final String baseUrl; if (isSnapshot && stagingHash == null) { throw new UserException( @@ -370,11 +366,21 @@ private String getElasticUrl( "attempted to install release build of official plugin on snapshot build of Elasticsearch" ); } + // assumption: we will only be publishing plugins to snapshot or staging when they're versioned + String semanticVersion = getSemanticVersion(Build.current().version()); + if (semanticVersion == null) { + throw new UserException( + ExitCodes.CONFIG, + "attempted to download a plugin for a non-semantically-versioned build of Elasticsearch: [" + + Build.current().version() + + "]" + ); + } if (stagingHash != null) { if (isSnapshot) { - baseUrl = nonReleaseUrl("snapshots", version, stagingHash, pluginId); + baseUrl = nonReleaseUrl("snapshots", semanticVersion, stagingHash, pluginId); } else { - baseUrl = nonReleaseUrl("staging", version, stagingHash, pluginId); + baseUrl = nonReleaseUrl("staging", semanticVersion, stagingHash, pluginId); } } else { baseUrl = String.format(Locale.ROOT, "https://artifacts.elastic.co/downloads/elasticsearch-plugins/%s", pluginId); @@ -393,7 +399,7 @@ private String getElasticUrl( return String.format(Locale.ROOT, "%s/%s-%s.zip", baseUrl, pluginId, Build.current().qualifiedVersion()); } - private static String nonReleaseUrl(final String hostname, final Version version, final String stagingHash, final String pluginId) { + private static String nonReleaseUrl(final String hostname, final String version, final String stagingHash, final String pluginId) { return String.format( Locale.ROOT, "https://%s.elastic.co/%s-%s/downloads/elasticsearch-plugins/%s", @@ -1088,4 +1094,9 @@ private static void setFileAttributes(final Path path, final Set getPluginsToUpgrade( throw new RuntimeException("Couldn't find a PluginInfo for [" + eachPluginId + "], which should be impossible"); }); - if (info.getElasticsearchVersion().before(Version.CURRENT)) { + if (info.getElasticsearchVersion().toString().equals(Build.current().version()) == false) { this.terminal.println( Terminal.Verbosity.VERBOSE, String.format( Locale.ROOT, - "Official plugin [%s] is out-of-date (%s versus %s), upgrading", + "Official plugin [%s] is out-of-sync (%s versus %s), upgrading", eachPluginId, info.getElasticsearchVersion(), - Version.CURRENT + Build.current().version() ) ); return true; @@ -278,14 +278,14 @@ private List getExistingPlugins() throws PluginSyncException { // Check for a version mismatch, unless it's an official plugin since we can upgrade them. if (InstallPluginAction.OFFICIAL_PLUGINS.contains(info.getName()) - && info.getElasticsearchVersion().equals(Version.CURRENT) == false) { + && info.getElasticsearchVersion().toString().equals(Build.current().version()) == false) { this.terminal.errorPrintln( String.format( Locale.ROOT, "WARNING: plugin [%s] was built for Elasticsearch version %s but version %s is required", info.getName(), info.getElasticsearchVersion(), - Version.CURRENT + Build.current().version() ) ); } diff --git a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/InstallPluginActionTests.java b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/InstallPluginActionTests.java index 2a66ed3cf4349..2da05d87f831f 100644 --- a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/InstallPluginActionTests.java +++ b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/InstallPluginActionTests.java @@ -32,7 +32,6 @@ import org.bouncycastle.openpgp.operator.jcajce.JcePBESecretKeyDecryptorBuilder; import org.bouncycastle.openpgp.operator.jcajce.JcePBESecretKeyEncryptorBuilder; import org.elasticsearch.Build; -import org.elasticsearch.Version; import org.elasticsearch.cli.ExitCodes; import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cli.ProcessInfo; @@ -111,6 +110,7 @@ import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -298,7 +298,7 @@ private static String[] pluginProperties(String name, String[] additionalProps, "version", "1.0", "elasticsearch.version", - Version.CURRENT.toString(), + InstallPluginAction.getSemanticVersion(Build.current().version()), "java.version", System.getProperty("java.specification.version") @@ -724,7 +724,7 @@ public void testPluginPermissions() throws Exception { final Path platformBinDir = platformNameDir.resolve("bin"); Files.createDirectories(platformBinDir); - Files.createFile(tempPluginDir.resolve("fake-" + Version.CURRENT.toString() + ".jar")); + Files.createFile(tempPluginDir.resolve("fake-" + Build.current().version() + ".jar")); Files.createFile(platformBinDir.resolve("fake_executable")); Files.createDirectory(resourcesDir); Files.createFile(resourcesDir.resolve("resource")); @@ -740,7 +740,7 @@ public void testPluginPermissions() throws Exception { final Path platformName = platform.resolve("linux-x86_64"); final Path bin = platformName.resolve("bin"); assert755(fake); - assert644(fake.resolve("fake-" + Version.CURRENT + ".jar")); + assert644(fake.resolve("fake-" + Build.current().version() + ".jar")); assert755(resources); assert644(resources.resolve("resource")); assert755(platform); @@ -1110,8 +1110,8 @@ public void testOfficialPluginSnapshot() throws Exception { String url = String.format( Locale.ROOT, "https://snapshots.elastic.co/%s-abc123/downloads/elasticsearch-plugins/analysis-icu/analysis-icu-%s.zip", - Version.CURRENT, - Build.current().qualifiedVersion() + InstallPluginAction.getSemanticVersion(Build.current().version()), + Build.current().version() ); assertInstallPluginFromUrl("analysis-icu", url, "abc123", true); } @@ -1120,8 +1120,8 @@ public void testInstallReleaseBuildOfPluginOnSnapshotBuild() { String url = String.format( Locale.ROOT, "https://snapshots.elastic.co/%s-abc123/downloads/elasticsearch-plugins/analysis-icu/analysis-icu-%s.zip", - Version.CURRENT, - Build.current().qualifiedVersion() + InstallPluginAction.getSemanticVersion(Build.current().version()), + Build.current().version() ); // attempting to install a release build of a plugin (no staging ID) on a snapshot build should throw a user exception final UserException e = expectThrows( @@ -1137,9 +1137,9 @@ public void testInstallReleaseBuildOfPluginOnSnapshotBuild() { public void testOfficialPluginStaging() throws Exception { String url = "https://staging.elastic.co/" - + Version.CURRENT + + InstallPluginAction.getSemanticVersion(Build.current().version()) + "-abc123/downloads/elasticsearch-plugins/analysis-icu/analysis-icu-" - + Build.current().qualifiedVersion() + + Build.current().version() + ".zip"; assertInstallPluginFromUrl("analysis-icu", url, "abc123", false); } @@ -1148,7 +1148,7 @@ public void testOfficialPlatformPlugin() throws Exception { String url = "https://artifacts.elastic.co/downloads/elasticsearch-plugins/analysis-icu/analysis-icu-" + Platforms.PLATFORM_NAME + "-" - + Build.current().qualifiedVersion() + + Build.current().version() + ".zip"; assertInstallPluginFromUrl("analysis-icu", url, null, false); } @@ -1157,16 +1157,16 @@ public void testOfficialPlatformPluginSnapshot() throws Exception { String url = String.format( Locale.ROOT, "https://snapshots.elastic.co/%s-abc123/downloads/elasticsearch-plugins/analysis-icu/analysis-icu-%s-%s.zip", - Version.CURRENT, + InstallPluginAction.getSemanticVersion(Build.current().version()), Platforms.PLATFORM_NAME, - Build.current().qualifiedVersion() + Build.current().version() ); assertInstallPluginFromUrl("analysis-icu", url, "abc123", true); } public void testOfficialPlatformPluginStaging() throws Exception { String url = "https://staging.elastic.co/" - + Version.CURRENT + + InstallPluginAction.getSemanticVersion(Build.current().version()) + "-abc123/downloads/elasticsearch-plugins/analysis-icu/analysis-icu-" + Platforms.PLATFORM_NAME + "-" @@ -1580,6 +1580,17 @@ public void testStablePluginWithoutNamedComponentsFile() throws Exception { assertNamedComponentFile("stable1", env.v2().pluginsFile(), namedComponentsJSON()); } + public void testGetSemanticVersion() { + assertThat(InstallPluginAction.getSemanticVersion("1.2.3"), equalTo("1.2.3")); + assertThat(InstallPluginAction.getSemanticVersion("123.456.789"), equalTo("123.456.789")); + assertThat(InstallPluginAction.getSemanticVersion("1.2.3-SNAPSHOT"), equalTo("1.2.3")); + assertThat(InstallPluginAction.getSemanticVersion("1.2.3foobar"), equalTo("1.2.3")); + assertThat(InstallPluginAction.getSemanticVersion("1.2.3.4"), equalTo("1.2.3")); + assertThat(InstallPluginAction.getSemanticVersion("1.2"), nullValue()); + assertThat(InstallPluginAction.getSemanticVersion("foo"), nullValue()); + assertThat(InstallPluginAction.getSemanticVersion("foo-1.2.3"), nullValue()); + } + private Map> namedComponentsMap() { Map> result = new LinkedHashMap<>(); Map extensibles = new LinkedHashMap<>(); diff --git a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/ListPluginsCommandTests.java b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/ListPluginsCommandTests.java index e1577f7d101be..b225bc441794a 100644 --- a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/ListPluginsCommandTests.java +++ b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/ListPluginsCommandTests.java @@ -215,7 +215,7 @@ public void testExistingIncompatiblePlugin() throws Exception { "version", "1.0", "elasticsearch.version", - Version.fromString("1.0.0").toString(), + "1.0.0", "java.version", System.getProperty("java.specification.version"), "classname", diff --git a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/SyncPluginsActionTests.java b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/SyncPluginsActionTests.java index 2c200df2a7d56..9802b4039bb7b 100644 --- a/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/SyncPluginsActionTests.java +++ b/distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/cli/SyncPluginsActionTests.java @@ -8,7 +8,7 @@ package org.elasticsearch.plugins.cli; import org.apache.lucene.tests.util.LuceneTestCase; -import org.elasticsearch.Version; +import org.elasticsearch.Build; import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cli.UserException; import org.elasticsearch.common.settings.Settings; @@ -17,6 +17,7 @@ import org.elasticsearch.plugins.PluginTestUtil; import org.elasticsearch.plugins.cli.SyncPluginsAction.PluginChanges; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.hamcrest.Matchers; import org.junit.Before; import org.mockito.InOrder; @@ -26,6 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Objects; import java.util.Optional; import static org.hamcrest.Matchers.containsString; @@ -129,7 +131,7 @@ public void test_getPluginChanges_withPluginToInstall_returnsPluginToInstall() t * since we can't automatically upgrade it. */ public void test_getPluginChanges_withPluginToUpgrade_returnsNoChanges() throws Exception { - createPlugin("my-plugin", Version.CURRENT.previousMajor()); + createPlugin("my-plugin", VersionUtils.getPreviousVersion().toString()); config.setPlugins(List.of(new InstallablePlugin("my-plugin"))); final PluginChanges pluginChanges = action.getPluginChanges(config, Optional.empty()); @@ -142,7 +144,7 @@ public void test_getPluginChanges_withPluginToUpgrade_returnsNoChanges() throws * but needs to be upgraded, then we calculate that the plugin needs to be upgraded. */ public void test_getPluginChanges_withOfficialPluginToUpgrade_returnsPluginToUpgrade() throws Exception { - createPlugin("analysis-icu", Version.CURRENT.previousMajor()); + createPlugin("analysis-icu", VersionUtils.getPreviousVersion().toString()); config.setPlugins(List.of(new InstallablePlugin("analysis-icu"))); final PluginChanges pluginChanges = action.getPluginChanges(config, Optional.empty()); @@ -329,10 +331,11 @@ public void test_performSync_withPluginsToUpgrade_callsUpgradeAction() throws Ex } private void createPlugin(String name) throws IOException { - createPlugin(name, Version.CURRENT); + String semanticVersion = InstallPluginAction.getSemanticVersion(Build.current().version()); + createPlugin(name, Objects.nonNull(semanticVersion) ? semanticVersion : Build.current().version()); } - private void createPlugin(String name, Version version) throws IOException { + private void createPlugin(String name, String version) throws IOException { PluginTestUtil.writePluginProperties( env.pluginsFile().resolve(name), "description", @@ -342,7 +345,7 @@ private void createPlugin(String name, Version version) throws IOException { "version", "1.0", "elasticsearch.version", - version.toString(), + version, "java.version", System.getProperty("java.specification.version"), "classname", diff --git a/docs/changelog/100519.yaml b/docs/changelog/100519.yaml new file mode 100644 index 0000000000000..086c6962b3a95 --- /dev/null +++ b/docs/changelog/100519.yaml @@ -0,0 +1,5 @@ +pr: 100519 +summary: Disallow vectors whose magnitudes will not fit in a float +area: Vector Search +type: bug +issues: [] diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 6c7fa4d4653d2..01f330a93e8fa 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionSha256Sum=bb09982fdf52718e4c7b25023d10df6d35a5fff969860bdf5a5bd27a3ab27a9e -distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-all.zip +distributionSha256Sum=f2b9ed0faf8472cbe469255ae6c86eddb77076c75191741b4a462f33128dd419 +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-all.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 0adc8e1a53214..1aa94a4269074 100755 --- a/gradlew +++ b/gradlew @@ -145,7 +145,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -153,7 +153,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -202,11 +202,11 @@ fi # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/plugins/examples/gradle/wrapper/gradle-wrapper.properties b/plugins/examples/gradle/wrapper/gradle-wrapper.properties index 6c7fa4d4653d2..01f330a93e8fa 100644 --- a/plugins/examples/gradle/wrapper/gradle-wrapper.properties +++ b/plugins/examples/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionSha256Sum=bb09982fdf52718e4c7b25023d10df6d35a5fff969860bdf5a5bd27a3ab27a9e -distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-all.zip +distributionSha256Sum=f2b9ed0faf8472cbe469255ae6c86eddb77076c75191741b4a462f33128dd419 +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-all.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/60_dense_vector_dynamic_mapping.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/60_dense_vector_dynamic_mapping.yml index d2c02fcbff38e..4ef700f807c13 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/60_dense_vector_dynamic_mapping.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/60_dense_vector_dynamic_mapping.yml @@ -3,6 +3,30 @@ setup: version: ' - 8.10.99' reason: 'Dynamic mapping of floats to dense_vector was added in 8.11' + # Additional logging for issue: https://github.com/elastic/elasticsearch/issues/100502 + - do: + cluster.put_settings: + body: > + { + "persistent": { + "logger.org.elasticsearch.index": "TRACE" + } + } + +--- +teardown: + - skip: + version: ' - 8.10.99' + reason: 'Dynamic mapping of floats to dense_vector was added in 8.11' + + - do: + cluster.put_settings: + body: > + { + "persistent": { + "logger.org.elasticsearch.index": null + } + } --- "Fields with float arrays below the threshold still map as float": diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 5d51a7959b5fa..6d323c4fc2ea7 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -134,6 +134,7 @@ static TransportVersion def(int id) { public static final TransportVersion NODE_INFO_REQUEST_SIMPLIFIED = def(8_510_00_0); public static final TransportVersion NESTED_KNN_VECTOR_QUERY_V = def(8_511_00_0); public static final TransportVersion ML_PACKAGE_LOADER_PLATFORM_ADDED = def(8_512_00_0); + public static final TransportVersion ELSER_SERVICE_MODEL_VERSION_ADDED_PATCH = def(8_512_00_1); public static final TransportVersion PLUGIN_DESCRIPTOR_OPTIONAL_CLASSNAME = def(8_513_00_0); public static final TransportVersion UNIVERSAL_PROFILING_LICENSE_ADDED = def(8_514_00_0); public static final TransportVersion ELSER_SERVICE_MODEL_VERSION_ADDED = def(8_515_00_0); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index bd9382aeaa758..1a626fe4dce31 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -213,6 +213,8 @@ public void onFailure(Exception e) { public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { startedCleanup = true; logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId); + // We fork here just to call SnapshotsService#minCompatibleVersion (which may be to expensive to run directly) but + // BlobStoreRepository#cleanup forks again straight away. TODO reduce the forking here. threadPool.executor(ThreadPool.Names.SNAPSHOT) .execute( ActionRunnable.wrap( diff --git a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java index deb178ff724bb..ee144b25f4507 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java @@ -458,6 +458,15 @@ void checkVectorMagnitude( ) { StringBuilder errorBuilder = null; + if (Float.isNaN(squaredMagnitude) || Float.isInfinite(squaredMagnitude)) { + errorBuilder = new StringBuilder( + "NaN or Infinite magnitude detected, this usually means the vector values are too extreme to fit within a float." + ); + } + if (errorBuilder != null) { + throw new IllegalArgumentException(appender.apply(errorBuilder).toString()); + } + if (similarity == VectorSimilarity.DOT_PRODUCT && Math.abs(squaredMagnitude - 1.0f) > 1e-4f) { errorBuilder = new StringBuilder( "The [" + VectorSimilarity.DOT_PRODUCT + "] similarity can only be used with unit-length vectors." @@ -886,7 +895,9 @@ public Query createKnnQuery( } elementType.checkVectorBounds(queryVector); - if (similarity == VectorSimilarity.DOT_PRODUCT || similarity == VectorSimilarity.COSINE) { + if (similarity == VectorSimilarity.DOT_PRODUCT + || similarity == VectorSimilarity.COSINE + || similarity == VectorSimilarity.MAX_INNER_PRODUCT) { float squaredMagnitude = VectorUtil.dotProduct(queryVector, queryVector); elementType.checkVectorMagnitude(similarity, ElementType.errorFloatElementsAppender(queryVector), squaredMagnitude); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 1e2969b255877..39d11e9d9a4f3 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -849,6 +849,33 @@ public void onFailure(Exception e) { }); } + /** + * Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the + * repository. + * TODO: Add shard level cleanups + * TODO: Add unreferenced index metadata cleanup + *
    + *
  • Deleting stale indices
  • + *
  • Deleting unreferenced root level blobs
  • + *
+ * + * @param repositoryDataGeneration Generation of {@link RepositoryData} at start of process + * @param repositoryFormatIndexVersion Repository format version + * @param listener Listener to complete when done + */ + public void cleanup( + long repositoryDataGeneration, + IndexVersion repositoryFormatIndexVersion, + ActionListener listener + ) { + createSnapshotsDeletion( + List.of(), + repositoryDataGeneration, + repositoryFormatIndexVersion, + listener.delegateFailureAndWrap((delegate, snapshotsDeletion) -> snapshotsDeletion.runCleanup(delegate)) + ); + } + private void createSnapshotsDeletion( Collection snapshotIds, long repositoryDataGeneration, @@ -890,7 +917,7 @@ private void createSnapshotsDeletion( class SnapshotsDeletion { /** - * The IDs of the snapshots to delete. + * The IDs of the snapshots to delete. This collection is empty if the deletion is a repository cleanup. */ private final Collection snapshotIds; @@ -976,7 +1003,7 @@ private record ShardSnapshotMetaDeleteResult( // --------------------------------------------------------------------------------------------------------------------------------- // The overall flow of execution - private void runDelete(SnapshotDeleteListener listener) { + void runDelete(SnapshotDeleteListener listener) { if (useShardGenerations) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets final ListenableFuture> writeShardMetaDataAndComputeDeletesStep = @@ -1009,7 +1036,7 @@ private void runDelete(SnapshotDeleteListener listener) { listener.onRepositoryDataWritten(newRepositoryData); // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion try (var refs = new RefCountingRunnable(listener::onDone)) { - cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); + cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener().map(ignored -> null)); cleanupUnlinkedShardLevelBlobs(writeShardMetaDataAndComputeDeletesStep.result(), refs.acquireListener()); } }, listener::onFailure)); @@ -1026,7 +1053,7 @@ private void runDelete(SnapshotDeleteListener listener) { listener.onDone(); })) { // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); + cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener().map(ignored -> null)); // writeIndexGen finishes on master-service thread so must fork here. snapshotExecutor.execute( @@ -1043,6 +1070,34 @@ private void runDelete(SnapshotDeleteListener listener) { } } + void runCleanup(ActionListener listener) { + final Set survivingIndexIds = originalRepositoryData.getIndices() + .values() + .stream() + .map(IndexId::getId) + .collect(Collectors.toSet()); + final List staleRootBlobs = staleRootBlobs(originalRepositoryData, originalRootBlobs.keySet()); + if (survivingIndexIds.equals(originalIndexContainers.keySet()) && staleRootBlobs.isEmpty()) { + // Nothing to clean up we return + listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO)); + } else { + // write new index-N blob to ensure concurrent operations will fail + writeIndexGen( + originalRepositoryData, + originalRepositoryDataGeneration, + repositoryFormatIndexVersion, + Function.identity(), + listener.delegateFailureAndWrap( + // TODO should we pass newRepositoryData to cleanupStaleBlobs()? + (l, newRepositoryData) -> cleanupUnlinkedRootAndIndicesBlobs( + originalRepositoryData, + l.map(RepositoryCleanupResult::new) + ) + ) + ); + } + } + // --------------------------------------------------------------------------------------------------------------------------------- // Updating the shard-level metadata and accumulating results @@ -1251,14 +1306,6 @@ private static List unusedBlobs( // --------------------------------------------------------------------------------------------------------------------------------- // Cleaning up dangling blobs - /** - * Delete any dangling blobs in the repository root (i.e. {@link RepositoryData}, {@link SnapshotInfo} and {@link Metadata} blobs) - * as well as any containers for indices that are now completely unreferenced. - */ - private void cleanupUnlinkedRootAndIndicesBlobs(RepositoryData newRepositoryData, ActionListener listener) { - cleanupStaleBlobs(snapshotIds, originalIndexContainers, originalRootBlobs, newRepositoryData, listener.map(ignored -> null)); - } - private void cleanupUnlinkedShardLevelBlobs( Collection shardDeleteResults, ActionListener listener @@ -1295,201 +1342,139 @@ private Iterator resolveFilesToDelete(Collection snapshotIds, - Map originalIndexContainers, - Map originalRootBlobs, - RepositoryData newRepositoryData, - ActionListener listener - ) { - final var blobsDeleted = new AtomicLong(); - final var bytesDeleted = new AtomicLong(); - try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) { - - final List staleRootBlobs = staleRootBlobs(newRepositoryData, originalRootBlobs.keySet()); - if (staleRootBlobs.isEmpty() == false) { - staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { - try (ref) { - logStaleRootLevelBlobs(newRepositoryData.getGenId() - 1, snapshotIds, staleRootBlobs); - deleteFromContainer(blobContainer(), staleRootBlobs.iterator()); - for (final var staleRootBlob : staleRootBlobs) { - bytesDeleted.addAndGet(originalRootBlobs.get(staleRootBlob).length()); - } - blobsDeleted.addAndGet(staleRootBlobs.size()); - } catch (Exception e) { - logger.warn( - () -> format( - "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", - metadata.name(), - staleRootBlobs - ), - e - ); - } - })); - } + /** + * Cleans up stale blobs directly under the repository root as well as all indices paths that aren't referenced by any existing + * snapshots. This method is only to be called directly after a new {@link RepositoryData} was written to the repository. + * + * @param newRepositoryData new repository data that was just written + * @param listener listener to invoke with the combined {@link DeleteResult} of all blobs removed in this operation + */ + private void cleanupUnlinkedRootAndIndicesBlobs(RepositoryData newRepositoryData, ActionListener listener) { + final var blobsDeleted = new AtomicLong(); + final var bytesDeleted = new AtomicLong(); + try ( + var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get()))) + ) { - final var survivingIndexIds = newRepositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); - for (final var indexEntry : originalIndexContainers.entrySet()) { - final var indexId = indexEntry.getKey(); - if (survivingIndexIds.contains(indexId)) { - continue; + final List staleRootBlobs = staleRootBlobs(newRepositoryData, originalRootBlobs.keySet()); + if (staleRootBlobs.isEmpty() == false) { + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logStaleRootLevelBlobs(newRepositoryData.getGenId() - 1, snapshotIds, staleRootBlobs); + deleteFromContainer(blobContainer(), staleRootBlobs.iterator()); + for (final var staleRootBlob : staleRootBlobs) { + bytesDeleted.addAndGet(originalRootBlobs.get(staleRootBlob).length()); + } + blobsDeleted.addAndGet(staleRootBlobs.size()); + } catch (Exception e) { + logger.warn( + () -> format( + "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", + metadata.name(), + staleRootBlobs + ), + e + ); + } + })); } - staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { - try (ref) { - logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexId); - final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT); - blobsDeleted.addAndGet(deleteResult.blobsDeleted()); - bytesDeleted.addAndGet(deleteResult.bytesDeleted()); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexId); - } catch (IOException e) { - logger.warn(() -> format(""" - [%s] index %s is no longer part of any snapshot in the repository, \ - but failed to clean up its index folder""", metadata.name(), indexId), e); + + final var survivingIndexIds = newRepositoryData.getIndices() + .values() + .stream() + .map(IndexId::getId) + .collect(Collectors.toSet()); + for (final var indexEntry : originalIndexContainers.entrySet()) { + final var indexId = indexEntry.getKey(); + if (survivingIndexIds.contains(indexId)) { + continue; } - })); + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexId); + final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT); + blobsDeleted.addAndGet(deleteResult.blobsDeleted()); + bytesDeleted.addAndGet(deleteResult.bytesDeleted()); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexId); + } catch (IOException e) { + logger.warn(() -> format(""" + [%s] index %s is no longer part of any snapshot in the repository, \ + but failed to clean up its index folder""", metadata.name(), indexId), e); + } + })); + } } - } - // If we did the cleanup of stale indices purely using a throttled executor then there would be no backpressure to prevent us from - // falling arbitrarily far behind. But nor do we want to dedicate all the SNAPSHOT threads to stale index cleanups because that - // would slow down other snapshot operations in situations that do not need backpressure. - // - // The solution is to dedicate one SNAPSHOT thread to doing the cleanups eagerly, alongside the throttled executor which spreads - // the rest of the work across the other threads if they are free. If the eager cleanup loop doesn't finish before the next one - // starts then we dedicate another SNAPSHOT thread to the deletions, and so on, until eventually either we catch up or the SNAPSHOT - // pool is fully occupied with blob deletions, which pushes back on other snapshot operations. + // If we did the cleanup of stale indices purely using a throttled executor then there would be no backpressure to prevent us + // from falling arbitrarily far behind. But nor do we want to dedicate all the SNAPSHOT threads to stale index cleanups because + // that would slow down other snapshot operations in situations that do not need backpressure. + // + // The solution is to dedicate one SNAPSHOT thread to doing the cleanups eagerly, alongside the throttled executor which spreads + // the rest of the work across the other threads if they are free. If the eager cleanup loop doesn't finish before the next one + // starts then we dedicate another SNAPSHOT thread to the deletions, and so on, until eventually either we catch up or the + // SNAPSHOT pool is fully occupied with blob deletions, which pushes back on other snapshot operations. - staleBlobDeleteRunner.runSyncTasksEagerly(threadPool.executor(ThreadPool.Names.SNAPSHOT)); - } + staleBlobDeleteRunner.runSyncTasksEagerly(snapshotExecutor); + } - /** - * Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the - * repository. - * TODO: Add shard level cleanups - * TODO: Add unreferenced index metadata cleanup - *
    - *
  • Deleting stale indices
  • - *
  • Deleting unreferenced root level blobs
  • - *
- * @param originalRepositoryDataGeneration Current repository state id - * @param repositoryFormatIndexVersion version of the updated repository metadata to write - * @param listener Listener to complete when done - */ - public void cleanup( - long originalRepositoryDataGeneration, - IndexVersion repositoryFormatIndexVersion, - ActionListener listener - ) { - try { - if (isReadOnly()) { - throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); - } - Map originalRootBlobs = blobContainer().listBlobs(OperationPurpose.SNAPSHOT); - final RepositoryData originalRepositoryData = safeRepositoryData(originalRepositoryDataGeneration, originalRootBlobs); - final Map originalIndexContainers = blobStore().blobContainer(indicesPath()) - .children(OperationPurpose.SNAPSHOT); - final Set survivingIndexIds = originalRepositoryData.getIndices() - .values() + // Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData + private static List staleRootBlobs(RepositoryData newRepositoryData, Set originalRootBlobNames) { + final Set allSnapshotIds = newRepositoryData.getSnapshotIds() .stream() - .map(IndexId::getId) + .map(SnapshotId::getUUID) .collect(Collectors.toSet()); - final List staleRootBlobs = staleRootBlobs(originalRepositoryData, originalRootBlobs.keySet()); - if (survivingIndexIds.equals(originalIndexContainers.keySet()) && staleRootBlobs.isEmpty()) { - // Nothing to clean up we return - listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO)); - } else { - // write new index-N blob to ensure concurrent operations will fail - writeIndexGen( - originalRepositoryData, - originalRepositoryDataGeneration, - repositoryFormatIndexVersion, - Function.identity(), - listener.delegateFailureAndWrap( - (l, v) -> cleanupStaleBlobs( - Collections.emptyList(), - originalIndexContainers, - originalRootBlobs, - originalRepositoryData, - l.map(RepositoryCleanupResult::new) - ) - ) - ); - } - } catch (Exception e) { - listener.onFailure(e); - } - } - - // Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData - private static List staleRootBlobs(RepositoryData originalRepositoryData, Set originalRootBlobNames) { - final Set allSnapshotIds = originalRepositoryData.getSnapshotIds() - .stream() - .map(SnapshotId::getUUID) - .collect(Collectors.toSet()); - return originalRootBlobNames.stream().filter(blob -> { - if (FsBlobContainer.isTempBlobName(blob)) { - return true; - } - if (blob.endsWith(".dat")) { - final String foundUUID; - if (blob.startsWith(SNAPSHOT_PREFIX)) { - foundUUID = blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()); - assert SNAPSHOT_FORMAT.blobName(foundUUID).equals(blob); - } else if (blob.startsWith(METADATA_PREFIX)) { - foundUUID = blob.substring(METADATA_PREFIX.length(), blob.length() - ".dat".length()); - assert GLOBAL_METADATA_FORMAT.blobName(foundUUID).equals(blob); - } else { - return false; + return originalRootBlobNames.stream().filter(blob -> { + if (FsBlobContainer.isTempBlobName(blob)) { + return true; } - return allSnapshotIds.contains(foundUUID) == false; - } else if (blob.startsWith(INDEX_FILE_PREFIX)) { - // TODO: Include the current generation here once we remove keeping index-(N-1) around from #writeIndexGen - try { - return originalRepositoryData.getGenId() > Long.parseLong(blob.substring(INDEX_FILE_PREFIX.length())); - } catch (NumberFormatException nfe) { - // odd case of an extra file with the index- prefix that we can't identify - return false; + if (blob.endsWith(".dat")) { + final String foundUUID; + if (blob.startsWith(SNAPSHOT_PREFIX)) { + foundUUID = blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()); + assert SNAPSHOT_FORMAT.blobName(foundUUID).equals(blob); + } else if (blob.startsWith(METADATA_PREFIX)) { + foundUUID = blob.substring(METADATA_PREFIX.length(), blob.length() - ".dat".length()); + assert GLOBAL_METADATA_FORMAT.blobName(foundUUID).equals(blob); + } else { + return false; + } + return allSnapshotIds.contains(foundUUID) == false; + } else if (blob.startsWith(INDEX_FILE_PREFIX)) { + // TODO: Include the current generation here once we remove keeping index-(N-1) around from #writeIndexGen + try { + return newRepositoryData.getGenId() > Long.parseLong(blob.substring(INDEX_FILE_PREFIX.length())); + } catch (NumberFormatException nfe) { + // odd case of an extra file with the index- prefix that we can't identify + return false; + } } - } - return false; - }).toList(); - } + return false; + }).toList(); + } - private void logStaleRootLevelBlobs( - long originalRepositoryDataGeneration, - Collection snapshotIds, - List blobsToDelete - ) { - if (logger.isInfoEnabled()) { - // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata - // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot - // delete would also log a confusing INFO message about "stale blobs". - final Set blobNamesToIgnore = snapshotIds.stream() - .flatMap( - snapshotId -> Stream.of( - GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), - SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), - INDEX_FILE_PREFIX + originalRepositoryDataGeneration + private void logStaleRootLevelBlobs( + long newestStaleRepositoryDataGeneration, + Collection snapshotIds, + List blobsToDelete + ) { + if (logger.isInfoEnabled()) { + // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata + // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot + // delete would also log a confusing INFO message about "stale blobs". + final Set blobNamesToIgnore = snapshotIds.stream() + .flatMap( + snapshotId -> Stream.of( + GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), + SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), + INDEX_FILE_PREFIX + newestStaleRepositoryDataGeneration + ) ) - ) - .collect(Collectors.toSet()); - final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); - if (blobsToLog.isEmpty() == false) { - logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); + .collect(Collectors.toSet()); + final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); + if (blobsToLog.isEmpty() == false) { + logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); + } } } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java index 2899dab6ff303..6d562f88a0100 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java @@ -20,7 +20,9 @@ import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.PerFieldMapperCodec; @@ -231,6 +233,26 @@ public void testDims() { } } + public void testMergeDims() throws IOException { + XContentBuilder mapping = mapping(b -> { + b.startObject("field"); + b.field("type", "dense_vector"); + b.endObject(); + }); + MapperService mapperService = createMapperService(mapping); + + mapping = mapping(b -> { + b.startObject("field"); + b.field("type", "dense_vector").field("dims", 4).field("similarity", "cosine").field("index", true); + b.endObject(); + }); + merge(mapperService, mapping); + assertEquals( + XContentHelper.convertToMap(BytesReference.bytes(mapping), false, mapping.contentType()).v2(), + XContentHelper.convertToMap(mapperService.documentMapper().mappingSource().uncompressed(), false, mapping.contentType()).v2() + ); + } + public void testDefaults() throws Exception { DocumentMapper mapper = createDocumentMapper(fieldMapping(b -> b.field("type", "dense_vector").field("dims", 3))); @@ -391,6 +413,40 @@ public void testCosineWithZeroByteVector() throws Exception { ); } + public void testMaxInnerProductWithValidNorm() throws Exception { + DocumentMapper mapper = createDocumentMapper( + fieldMapping( + b -> b.field("type", "dense_vector") + .field("dims", 3) + .field("index", true) + .field("similarity", VectorSimilarity.MAX_INNER_PRODUCT) + ) + ); + float[] vector = { -12.1f, 2.7f, -4 }; + // Shouldn't throw + mapper.parse(source(b -> b.array("field", vector))); + } + + public void testWithExtremeFloatVector() throws Exception { + for (VectorSimilarity vs : List.of(VectorSimilarity.COSINE, VectorSimilarity.DOT_PRODUCT, VectorSimilarity.COSINE)) { + DocumentMapper mapper = createDocumentMapper( + fieldMapping(b -> b.field("type", "dense_vector").field("dims", 3).field("index", true).field("similarity", vs)) + ); + float[] vector = { 0.07247924f, -4.310546E-11f, -1.7255947E30f }; + DocumentParsingException e = expectThrows( + DocumentParsingException.class, + () -> mapper.parse(source(b -> b.array("field", vector))) + ); + assertNotNull(e.getCause()); + assertThat( + e.getCause().getMessage(), + containsString( + "NaN or Infinite magnitude detected, this usually means the vector values are too extreme to fit within a float." + ) + ); + } + } + public void testInvalidParameters() { MapperParsingException e = expectThrows( MapperParsingException.class, diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 17f2303eb84c8..ab9d80b801863 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1509,7 +1509,7 @@ public MatchingDirectoryReader(DirectoryReader in, Query query) throws IOExcepti @Override public LeafReader wrap(LeafReader leaf) { try { - final IndexSearcher searcher = newSearcher(leaf, false, true, false); + final IndexSearcher searcher = new IndexSearcher(leaf); searcher.setQueryCache(null); final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); final Scorer scorer = weight.scorer(leaf.getContext()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java index 579638f474b21..dcd7e106b2e81 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java @@ -194,7 +194,10 @@ static RoleDescriptor kibanaSystem(String name) { // Fleet telemetry queries Agent Logs indices in kibana task runner RoleDescriptor.IndicesPrivileges.builder().indices("logs-elastic_agent*").privileges("read").build(), // Fleet publishes Agent metrics in kibana task runner - RoleDescriptor.IndicesPrivileges.builder().indices("metrics-fleet_server*").privileges("auto_configure", "write").build(), + RoleDescriptor.IndicesPrivileges.builder() + .indices("metrics-fleet_server*") + .privileges("auto_configure", "read", "write", "delete") + .build(), // Legacy "Alerts as data" used in Security Solution. // Kibana user creates these indices; reads / writes to them. RoleDescriptor.IndicesPrivileges.builder().indices(ReservedRolesStore.ALERTS_LEGACY_INDEX).privileges("all").build(), diff --git a/x-pack/plugin/eql/qa/correctness/build.gradle b/x-pack/plugin/eql/qa/correctness/build.gradle index 4a72f66c238e3..0008c30f260d6 100644 --- a/x-pack/plugin/eql/qa/correctness/build.gradle +++ b/x-pack/plugin/eql/qa/correctness/build.gradle @@ -14,7 +14,7 @@ dependencies { } File serviceAccountFile = providers.environmentVariable('eql_test_credentials_file') - .orElse(providers.systemProperty('eql.test.credentials.file').forUseAtConfigurationTime()) + .orElse(providers.systemProperty('eql.test.credentials.file')) .map { s -> new File(s)} .getOrNull() diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java index f6156507dffa2..c322520d8853b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasables; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -31,11 +32,12 @@ * 2 | 2 | "foo" * */ -public class MvExpandOperator extends AbstractPageMappingOperator { - public record Factory(int channel) implements OperatorFactory { +public class MvExpandOperator implements Operator { + + public record Factory(int channel, int blockSize) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new MvExpandOperator(channel); + return new MvExpandOperator(channel, blockSize); } @Override @@ -46,49 +48,158 @@ public String describe() { private final int channel; + private final int pageSize; + private int noops; - public MvExpandOperator(int channel) { + private Page prev; + private boolean prevCompleted = false; + private boolean finished = false; + + private Block expandingBlock; + private Block expandedBlock; + + private int nextPositionToProcess = 0; + private int nextMvToProcess = 0; + private int nextItemOnExpanded = 0; + + /** + * Count of pages that have been processed by this operator. + */ + private int pagesIn; + private int pagesOut; + + public MvExpandOperator(int channel, int pageSize) { this.channel = channel; + this.pageSize = pageSize; + assert pageSize > 0; } @Override - protected Page process(Page page) { - Block expandingBlock = page.getBlock(channel); - Block expandedBlock = expandingBlock.expand(); + public final Page getOutput() { + if (prev == null) { + return null; + } + pagesOut++; + if (prev.getPositionCount() == 0 || expandingBlock.mayHaveMultivaluedFields() == false) { + noops++; + Page result = prev; + prev = null; + return result; + } + + try { + return process(); + } finally { + if (prevCompleted && prev != null) { + prev.releaseBlocks(); + prev = null; + } + } + } + + protected Page process() { if (expandedBlock == expandingBlock) { noops++; - return page; + prevCompleted = true; + return prev; } - if (page.getBlockCount() == 1) { + if (prev.getBlockCount() == 1) { assert channel == 0; + prevCompleted = true; return new Page(expandedBlock); } - int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount()); + int[] duplicateFilter = nextDuplicateExpandingFilter(); - Block[] result = new Block[page.getBlockCount()]; + Block[] result = new Block[prev.getBlockCount()]; + int[] expandedMask = new int[duplicateFilter.length]; + for (int i = 0; i < expandedMask.length; i++) { + expandedMask[i] = i + nextItemOnExpanded; + } + nextItemOnExpanded += expandedMask.length; for (int b = 0; b < result.length; b++) { - result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter); + result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter); + } + if (nextItemOnExpanded == expandedBlock.getPositionCount()) { + nextItemOnExpanded = 0; } return new Page(result); } - private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) { - int[] duplicateFilter = new int[newPositions]; + private int[] nextDuplicateExpandingFilter() { + int[] duplicateFilter = new int[Math.min(pageSize, expandedBlock.getPositionCount() - nextPositionToProcess)]; int n = 0; - for (int p = 0; p < expandingBlock.getPositionCount(); p++) { - int count = expandingBlock.getValueCount(p); + while (true) { + int count = expandingBlock.getValueCount(nextPositionToProcess); int positions = count == 0 ? 1 : count; - Arrays.fill(duplicateFilter, n, n + positions, p); - n += positions; + int toAdd = Math.min(pageSize - n, positions - nextMvToProcess); + Arrays.fill(duplicateFilter, n, n + toAdd, nextPositionToProcess); + n += toAdd; + + if (n == pageSize) { + if (nextMvToProcess + toAdd == positions) { + // finished expanding this position, let's move on to next position (that will be expanded with next call) + nextMvToProcess = 0; + nextPositionToProcess++; + if (nextPositionToProcess == expandingBlock.getPositionCount()) { + nextPositionToProcess = 0; + prevCompleted = true; + } + } else { + // there are still items to expand in current position, but the duplicate filter is full, so we'll deal with them at + // next call + nextMvToProcess = nextMvToProcess + toAdd; + } + return duplicateFilter; + } + + nextMvToProcess = 0; + nextPositionToProcess++; + if (nextPositionToProcess == expandingBlock.getPositionCount()) { + nextPositionToProcess = 0; + nextMvToProcess = 0; + prevCompleted = true; + return n < pageSize ? Arrays.copyOfRange(duplicateFilter, 0, n) : duplicateFilter; + } } - return duplicateFilter; } @Override - protected AbstractPageMappingOperator.Status status(int pagesProcessed) { - return new Status(pagesProcessed, noops); + public final boolean needsInput() { + return prev == null && finished == false; + } + + @Override + public final void addInput(Page page) { + assert prev == null : "has pending input page"; + prev = page; + this.expandingBlock = prev.getBlock(channel); + this.expandedBlock = expandingBlock.expand(); + pagesIn++; + prevCompleted = false; + } + + @Override + public final void finish() { + finished = true; + } + + @Override + public final boolean isFinished() { + return finished && prev == null; + } + + @Override + public final Status status() { + return new Status(pagesIn, pagesOut, noops); + } + + @Override + public void close() { + if (prev != null) { + Releasables.closeExpectNoException(() -> prev.releaseBlocks()); + } } @Override @@ -96,35 +207,42 @@ public String toString() { return "MvExpandOperator[channel=" + channel + "]"; } - public static final class Status extends AbstractPageMappingOperator.Status { + public static final class Status implements Operator.Status { + + private final int pagesIn; + private final int pagesOut; + private final int noops; + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Operator.Status.class, "mv_expand", Status::new ); - private final int noops; - - Status(int pagesProcessed, int noops) { - super(pagesProcessed); + Status(int pagesIn, int pagesOut, int noops) { + this.pagesIn = pagesIn; + this.pagesOut = pagesOut; this.noops = noops; } Status(StreamInput in) throws IOException { - super(in); + pagesIn = in.readVInt(); + pagesOut = in.readVInt(); noops = in.readVInt(); } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + out.writeVInt(pagesIn); + out.writeVInt(pagesOut); out.writeVInt(noops); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("pages_processed", pagesProcessed()); + builder.field("pages_in", pagesIn); + builder.field("pages_out", pagesOut); builder.field("noops", noops); return builder.endObject(); } @@ -147,12 +265,20 @@ public boolean equals(Object o) { return false; } Status status = (Status) o; - return noops == status.noops && pagesProcessed() == status.pagesProcessed(); + return noops == status.noops && pagesIn == status.pagesIn && pagesOut == status.pagesOut; + } + + public int pagesIn() { + return pagesIn; + } + + public int pagesOut() { + return pagesOut; } @Override public int hashCode() { - return Objects.hash(noops, pagesProcessed()); + return Objects.hash(noops, pagesIn, pagesOut); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java index 930ced04636f8..df6c09ea1ff97 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java @@ -41,13 +41,12 @@ final class ExchangeBuffer { } void addPage(Page page) { + queue.add(page); + if (queueSize.incrementAndGet() == 1) { + notifyNotEmpty(); + } if (noMoreInputs) { - page.releaseBlocks(); - } else { - queue.add(page); - if (queueSize.incrementAndGet() == 1) { - notifyNotEmpty(); - } + discardPages(); } } @@ -115,13 +114,17 @@ SubscribableListener waitForReading() { } } + private void discardPages() { + Page p; + while ((p = pollPage()) != null) { + p.releaseBlocks(); + } + } + void finish(boolean drainingPages) { noMoreInputs = true; if (drainingPages) { - Page p; - while ((p = pollPage()) != null) { - p.releaseBlocks(); - } + discardPages(); } notifyNotEmpty(); if (drainingPages || queueSize.get() == 0) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java index fe281bbf16131..9527388a0d3cf 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java @@ -16,12 +16,12 @@ public class MvExpandOperatorStatusTests extends AbstractWireSerializingTestCase { public static MvExpandOperator.Status simple() { - return new MvExpandOperator.Status(10, 9); + return new MvExpandOperator.Status(10, 15, 9); } public static String simpleToJson() { return """ - {"pages_processed":10,"noops":9}"""; + {"pages_in":10,"pages_out":15,"noops":9}"""; } public void testToXContent() { @@ -35,20 +35,28 @@ protected Writeable.Reader instanceReader() { @Override public MvExpandOperator.Status createTestInstance() { - return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt()); + return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt()); } @Override protected MvExpandOperator.Status mutateInstance(MvExpandOperator.Status instance) { - switch (between(0, 1)) { + switch (between(0, 2)) { case 0: return new MvExpandOperator.Status( - randomValueOtherThan(instance.pagesProcessed(), ESTestCase::randomNonNegativeInt), + randomValueOtherThan(instance.pagesIn(), ESTestCase::randomNonNegativeInt), + instance.pagesOut(), instance.noops() ); case 1: return new MvExpandOperator.Status( - instance.pagesProcessed(), + instance.pagesIn(), + randomValueOtherThan(instance.pagesOut(), ESTestCase::randomNonNegativeInt), + instance.noops() + ); + case 2: + return new MvExpandOperator.Status( + instance.pagesIn(), + instance.pagesOut(), randomValueOtherThan(instance.noops(), ESTestCase::randomNonNegativeInt) ); default: diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java index 69c965fc91323..f99685609ff78 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java @@ -9,17 +9,19 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.compute.data.BasicBlockTests; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import java.util.Iterator; import java.util.List; import static org.elasticsearch.compute.data.BasicBlockTests.randomBlock; import static org.elasticsearch.compute.data.BasicBlockTests.valuesAtPositions; +import static org.elasticsearch.compute.data.BlockTestUtils.deepCopyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -47,7 +49,7 @@ protected Page createPage(int positionOffset, int length) { @Override protected Operator.OperatorFactory simple(BigArrays bigArrays) { - return new MvExpandOperator.Factory(0); + return new MvExpandOperator.Factory(0, randomIntBetween(1, 1000)); } @Override @@ -60,47 +62,143 @@ protected String expectedToStringOfSimple() { return expectedDescriptionOfSimple(); } - @Override - protected void assertSimpleOutput(List input, List results) { - assertThat(results, hasSize(results.size())); - for (int i = 0; i < results.size(); i++) { - IntBlock origExpanded = input.get(i).getBlock(0); - IntBlock resultExpanded = results.get(i).getBlock(0); - int np = 0; - for (int op = 0; op < origExpanded.getPositionCount(); op++) { - if (origExpanded.isNull(op)) { - assertThat(resultExpanded.isNull(np), equalTo(true)); - assertThat(resultExpanded.getValueCount(np++), equalTo(0)); - continue; - } - List oValues = BasicBlockTests.valuesAtPositions(origExpanded, op, op + 1).get(0); - for (Object ov : oValues) { - assertThat(resultExpanded.isNull(np), equalTo(false)); - assertThat(resultExpanded.getValueCount(np), equalTo(1)); - assertThat(BasicBlockTests.valuesAtPositions(resultExpanded, np, ++np).get(0), equalTo(List.of(ov))); + class BlockListIterator implements Iterator { + private final Iterator pagesIterator; + private final int channel; + private Block currentBlock; + private int nextPosition; + + BlockListIterator(List pages, int channel) { + this.pagesIterator = pages.iterator(); + this.channel = channel; + this.currentBlock = pagesIterator.next().getBlock(channel); + this.nextPosition = 0; + } + + @Override + public boolean hasNext() { + if (currentBlock == null) { + return false; + } + + return currentBlock.getValueCount(nextPosition) == 0 + || nextPosition < currentBlock.getPositionCount() + || pagesIterator.hasNext(); + } + + @Override + public Object next() { + if (currentBlock != null && currentBlock.getValueCount(nextPosition) == 0) { + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); } + return null; } + List items = valuesAtPositions(currentBlock, nextPosition, nextPosition + 1).get(0); + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return items.size() == 1 ? items.get(0) : items; + } - IntBlock origDuplicated = input.get(i).getBlock(1); - IntBlock resultDuplicated = results.get(i).getBlock(1); - np = 0; - for (int op = 0; op < origDuplicated.getPositionCount(); op++) { - int copies = origExpanded.isNull(op) ? 1 : origExpanded.getValueCount(op); - for (int c = 0; c < copies; c++) { - if (origDuplicated.isNull(op)) { - assertThat(resultDuplicated.isNull(np), equalTo(true)); - assertThat(resultDuplicated.getValueCount(np++), equalTo(0)); - continue; - } - assertThat(resultDuplicated.isNull(np), equalTo(false)); - assertThat(resultDuplicated.getValueCount(np), equalTo(origDuplicated.getValueCount(op))); - assertThat( - BasicBlockTests.valuesAtPositions(resultDuplicated, np, ++np).get(0), - equalTo(BasicBlockTests.valuesAtPositions(origDuplicated, op, op + 1).get(0)) - ); + private void loadNextBlock() { + if (pagesIterator.hasNext() == false) { + currentBlock = null; + return; + } + this.currentBlock = pagesIterator.next().getBlock(channel); + nextPosition = 0; + } + } + + class BlockListIteratorExpander implements Iterator { + private final Iterator pagesIterator; + private final int channel; + private Block currentBlock; + private int nextPosition; + private int nextInPosition; + + BlockListIteratorExpander(List pages, int channel) { + this.pagesIterator = pages.iterator(); + this.channel = channel; + this.currentBlock = pagesIterator.next().getBlock(channel); + this.nextPosition = 0; + this.nextInPosition = 0; + } + + @Override + public boolean hasNext() { + if (currentBlock == null) { + return false; + } + + return currentBlock.getValueCount(nextPosition) == 0 + || nextInPosition < currentBlock.getValueCount(nextPosition) + || nextPosition < currentBlock.getPositionCount() + || pagesIterator.hasNext(); + } + + @Override + public Object next() { + if (currentBlock != null && currentBlock.getValueCount(nextPosition) == 0) { + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); } + return null; + } + List items = valuesAtPositions(currentBlock, nextPosition, nextPosition + 1).get(0); + Object result = items == null ? null : items.get(nextInPosition++); + if (nextInPosition == currentBlock.getValueCount(nextPosition)) { + nextPosition++; + nextInPosition = 0; + } + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return result; + } + + private void loadNextBlock() { + if (pagesIterator.hasNext() == false) { + currentBlock = null; + return; + } + this.currentBlock = pagesIterator.next().getBlock(channel); + nextPosition = 0; + nextInPosition = 0; + } + } + + @Override + protected void assertSimpleOutput(List input, List results) { + assertThat(results, hasSize(results.size())); + + var inputIter = new BlockListIteratorExpander(input, 0); + var resultIter = new BlockListIteratorExpander(results, 0); + + while (inputIter.hasNext()) { + assertThat(resultIter.hasNext(), equalTo(true)); + assertThat(resultIter.next(), equalTo(inputIter.next())); + } + assertThat(resultIter.hasNext(), equalTo(false)); + + var originalMvIter = new BlockListIterator(input, 0); + var inputIter2 = new BlockListIterator(input, 1); + var resultIter2 = new BlockListIterator(results, 1); + + while (originalMvIter.hasNext()) { + Object originalMv = originalMvIter.next(); + int originalMvSize = originalMv instanceof List l ? l.size() : 1; + assertThat(resultIter2.hasNext(), equalTo(true)); + Object inputValue = inputIter2.next(); + for (int j = 0; j < originalMvSize; j++) { + assertThat(resultIter2.next(), equalTo(inputValue)); } } + assertThat(resultIter2.hasNext(), equalTo(false)); } @Override @@ -110,7 +208,7 @@ protected ByteSizeValue smallEnoughToCircuitBreak() { } public void testNoopStatus() { - MvExpandOperator op = new MvExpandOperator(0); + MvExpandOperator op = new MvExpandOperator(0, randomIntBetween(1, 1000)); List result = drive( op, List.of(new Page(IntVector.newVectorBuilder(2).appendInt(1).appendInt(2).build().asBlock())).iterator(), @@ -118,26 +216,45 @@ public void testNoopStatus() { ); assertThat(result, hasSize(1)); assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); - MvExpandOperator.Status status = (MvExpandOperator.Status) op.status(); - assertThat(status.pagesProcessed(), equalTo(1)); + MvExpandOperator.Status status = op.status(); + assertThat(status.pagesIn(), equalTo(1)); + assertThat(status.pagesOut(), equalTo(1)); assertThat(status.noops(), equalTo(1)); } public void testExpandStatus() { - MvExpandOperator op = new MvExpandOperator(0); + MvExpandOperator op = new MvExpandOperator(0, randomIntBetween(1, 1)); var builder = IntBlock.newBlockBuilder(2).beginPositionEntry().appendInt(1).appendInt(2).endPositionEntry(); List result = drive(op, List.of(new Page(builder.build())).iterator(), driverContext()); assertThat(result, hasSize(1)); assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); - MvExpandOperator.Status status = (MvExpandOperator.Status) op.status(); - assertThat(status.pagesProcessed(), equalTo(1)); + MvExpandOperator.Status status = op.status(); + assertThat(status.pagesIn(), equalTo(1)); + assertThat(status.pagesOut(), equalTo(1)); assertThat(status.noops(), equalTo(0)); } - // TODO: remove this once possible - // https://github.com/elastic/elasticsearch/issues/99826 - @Override - protected boolean canLeak() { - return true; + public void testExpandWithBytesRefs() { + DriverContext context = driverContext(); + List input = CannedSourceOperator.collectPages(new AbstractBlockSourceOperator(context.blockFactory(), 8 * 1024) { + private int idx; + + @Override + protected int remaining() { + return 10000 - idx; + } + + @Override + protected Page createPage(int positionOffset, int length) { + idx += length; + return new Page( + randomBlock(context.blockFactory(), ElementType.BYTES_REF, length, true, 1, 10, 0, 0).block(), + randomBlock(context.blockFactory(), ElementType.INT, length, false, 1, 10, 0, 0).block() + ); + } + }); + List origInput = deepCopyOf(input, BlockFactory.getNonBreakingInstance()); + List results = drive(new MvExpandOperator(0, randomIntBetween(1, 1000)), input.iterator(), context); + assertSimpleOutput(origInput, results); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 63f601669636c..5d881f03bd07f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -212,7 +212,7 @@ protected final void assertSimple(DriverContext context, int size) { unreleasedInputs++; } } - if ((canLeak() == false) && unreleasedInputs > 0) { + if (unreleasedInputs > 0) { throw new AssertionError("[" + unreleasedInputs + "] unreleased input blocks"); } } @@ -308,12 +308,6 @@ protected void start(Driver driver, ActionListener driverListener) { } } - // TODO: Remove this once all operators do not leak anymore - // https://github.com/elastic/elasticsearch/issues/99826 - protected boolean canLeak() { - return false; - } - public static void assertDriverContext(DriverContext driverContext) { assertTrue(driverContext.isFinished()); assertThat(driverContext.getSnapshot().releasables(), empty()); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java new file mode 100644 index 0000000000000..4c975c6c07834 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.exchange; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.BasicBlockTests; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.MockBlockFactory; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; + +public class ExchangeBufferTests extends ESTestCase { + + public void testDrainPages() throws Exception { + ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(10, 1000)); + var blockFactory = blockFactory(); + CountDownLatch latch = new CountDownLatch(1); + Thread[] producers = new Thread[between(1, 4)]; + AtomicBoolean stopped = new AtomicBoolean(); + AtomicInteger addedPages = new AtomicInteger(); + for (int t = 0; t < producers.length; t++) { + producers[t] = new Thread(() -> { + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + while (stopped.get() == false && addedPages.incrementAndGet() < 10_000) { + buffer.addPage(randomPage(blockFactory)); + } + }); + producers[t].start(); + } + latch.countDown(); + try { + int minPage = between(10, 100); + int receivedPage = 0; + while (receivedPage < minPage) { + Page p = buffer.pollPage(); + if (p != null) { + p.releaseBlocks(); + ++receivedPage; + } + } + } finally { + buffer.finish(true); + stopped.set(true); + } + for (Thread t : producers) { + t.join(); + } + assertThat(buffer.size(), equalTo(0)); + blockFactory.ensureAllBlocksAreReleased(); + } + + private static MockBlockFactory blockFactory() { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + return new MockBlockFactory(breaker, bigArrays); + } + + private static Page randomPage(BlockFactory blockFactory) { + Block block = BasicBlockTests.randomBlock( + blockFactory, + randomFrom(ElementType.LONG, ElementType.BYTES_REF, ElementType.BOOLEAN), + randomIntBetween(1, 100), + randomBoolean(), + 0, + between(1, 2), + 0, + between(1, 2) + ).block(); + return new Page(block); + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec index 7cc11c6fab5b3..ae27e8f56f9f7 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec @@ -24,3 +24,77 @@ a:integer | b:keyword | j:keyword 3 | b | "a" 3 | b | "b" ; + + +explosion +row +a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +b = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +c = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +d = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +e = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +f = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +g = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +x = 10000000000000 +| mv_expand a | mv_expand b | mv_expand c | mv_expand d | mv_expand e | mv_expand f | mv_expand g +| limit 10; + +a:integer | b:integer | c:integer | d:integer | e:integer | f:integer | g:integer | x:long +1 | 1 | 1 | 1 | 1 | 1 | 1 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 2 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 3 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 4 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 5 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 6 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 7 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 8 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 9 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 10 | 10000000000000 +; + + +explosionStats +row +a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +b = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +c = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +d = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +e = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +x = 10000000000000 +| mv_expand a | mv_expand b | mv_expand c | mv_expand d | mv_expand e +| stats sum_a = sum(a) by b +| sort b; + +//12555000 = sum(1..30) * 30 * 30 * 30 +sum_a:long | b:integer +12555000 | 1 +12555000 | 2 +12555000 | 3 +12555000 | 4 +12555000 | 5 +12555000 | 6 +12555000 | 7 +12555000 | 8 +12555000 | 9 +12555000 | 10 +12555000 | 11 +12555000 | 12 +12555000 | 13 +12555000 | 14 +12555000 | 15 +12555000 | 16 +12555000 | 17 +12555000 | 18 +12555000 | 19 +12555000 | 20 +12555000 | 21 +12555000 | 22 +12555000 | 23 +12555000 | 24 +12555000 | 25 +12555000 | 26 +12555000 | 27 +12555000 | 28 +12555000 | 29 +12555000 | 30 +; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 0017a8600a013..2712ef8d2f59b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -862,7 +862,6 @@ public void testFromStatsLimit() { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826") public void testFromLimit() { try (EsqlQueryResponse results = run("from test | keep data | limit 2")) { logger.info(results); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index b86072e1b6da0..bdc1c948f2055 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -584,7 +584,8 @@ private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContex private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) { PhysicalOperation source = plan(mvExpandExec.child(), context); - return source.with(new MvExpandOperator.Factory(source.layout.get(mvExpandExec.target().id()).channel()), source.layout); + int blockSize = 5000;// TODO estimate row size and use context.pageSize() + return source.with(new MvExpandOperator.Factory(source.layout.get(mvExpandExec.target().id()).channel(), blockSize), source.layout); } /** diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettings.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettings.java index 7dffbc693ca51..d1f27302f85f1 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettings.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettings.java @@ -87,10 +87,21 @@ public ElserMlNodeServiceSettings(int numAllocations, int numThreads, String var public ElserMlNodeServiceSettings(StreamInput in) throws IOException { numAllocations = in.readVInt(); numThreads = in.readVInt(); - if (in.getTransportVersion().onOrAfter(TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED)) { + if (transportVersionIsCompatibleWithElserModelVersion(in.getTransportVersion())) { modelVariant = in.readString(); } else { - modelVariant = ElserMlNodeService.ELSER_V1_MODEL; + modelVariant = ElserMlNodeService.ELSER_V2_MODEL; + } + } + + static boolean transportVersionIsCompatibleWithElserModelVersion(TransportVersion transportVersion) { + var nextNonPatchVersion = TransportVersions.PLUGIN_DESCRIPTOR_OPTIONAL_CLASSNAME; + + if (transportVersion.onOrAfter(TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED)) { + return true; + } else { + return transportVersion.onOrAfter(TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED_PATCH) + && transportVersion.before(nextNonPatchVersion); } } @@ -130,7 +141,7 @@ public TransportVersion getMinimalSupportedVersion() { public void writeTo(StreamOutput out) throws IOException { out.writeVInt(numAllocations); out.writeVInt(numThreads); - if (out.getTransportVersion().onOrAfter(TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED)) { + if (transportVersionIsCompatibleWithElserModelVersion(out.getTransportVersion())) { out.writeString(modelVariant); } } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettingsTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettingsTests.java index 35d5c0b8e9603..8b6f3f1a56ba6 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettingsTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceSettingsTests.java @@ -7,10 +7,12 @@ package org.elasticsearch.xpack.inference.services.elser; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -85,6 +87,52 @@ public void testFromMapMissingOptions() { assertThat(e.getMessage(), containsString("[service_settings] does not contain the required setting [num_allocations]")); } + public void testTransportVersionIsCompatibleWithElserModelVersion() { + assertTrue( + ElserMlNodeServiceSettings.transportVersionIsCompatibleWithElserModelVersion( + TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED + ) + ); + assertTrue( + ElserMlNodeServiceSettings.transportVersionIsCompatibleWithElserModelVersion( + TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED_PATCH + ) + ); + + assertFalse( + ElserMlNodeServiceSettings.transportVersionIsCompatibleWithElserModelVersion(TransportVersions.ML_PACKAGE_LOADER_PLATFORM_ADDED) + ); + assertFalse( + ElserMlNodeServiceSettings.transportVersionIsCompatibleWithElserModelVersion( + TransportVersions.PLUGIN_DESCRIPTOR_OPTIONAL_CLASSNAME + ) + ); + assertFalse( + ElserMlNodeServiceSettings.transportVersionIsCompatibleWithElserModelVersion( + TransportVersions.UNIVERSAL_PROFILING_LICENSE_ADDED + ) + ); + } + + public void testBwcWrite() throws IOException { + { + var settings = new ElserMlNodeServiceSettings(1, 1, ".elser_model_1"); + var copy = copyInstance(settings, TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED); + assertEquals(settings, copy); + } + { + var settings = new ElserMlNodeServiceSettings(1, 1, ".elser_model_1"); + var copy = copyInstance(settings, TransportVersions.PLUGIN_DESCRIPTOR_OPTIONAL_CLASSNAME); + assertNotEquals(settings, copy); + assertEquals(".elser_model_2", copy.getModelVariant()); + } + { + var settings = new ElserMlNodeServiceSettings(1, 1, ".elser_model_1"); + var copy = copyInstance(settings, TransportVersions.ELSER_SERVICE_MODEL_VERSION_ADDED_PATCH); + assertEquals(settings, copy); + } + } + public void testFromMapInvalidSettings() { var settingsMap = new HashMap( Map.of(ElserMlNodeServiceSettings.NUM_ALLOCATIONS, 0, ElserMlNodeServiceSettings.NUM_THREADS, -1)