Skip to content

Commit

Permalink
[ML] Add debug logging for tests failing with empty model download (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored and alexey-ivanov-es committed Nov 28, 2024
1 parent ea3a344 commit 744009c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ tests:
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityWithApmTracingRestIT
method: testTracingCrossCluster
issue: https://github.com/elastic/elasticsearch/issues/112731
- class: org.elasticsearch.xpack.inference.DefaultEndPointsIT
method: testInferDeploysDefaultElser
issue: https://github.com/elastic/elasticsearch/issues/114913
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=esql/60_usage/Basic ESQL usage output (telemetry)}
issue: https://github.com/elastic/elasticsearch/issues/115231
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.inference;

import org.elasticsearch.client.Request;
import org.elasticsearch.inference.TaskType;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.xpack.inference.services.elasticsearch.ElasticsearchInternalService;
Expand All @@ -27,8 +28,15 @@ public class DefaultEndPointsIT extends InferenceBaseRestTest {
private TestThreadPool threadPool;

@Before
public void createThreadPool() {
public void setupTest() throws IOException {
threadPool = new TestThreadPool(DefaultEndPointsIT.class.getSimpleName());

Request loggingSettings = new Request("PUT", "_cluster/settings");
loggingSettings.setJsonEntity("""
{"persistent" : {
"logger.org.elasticsearch.xpack.ml.packageloader" : "DEBUG"
}}""");
client().performRequest(loggingSettings);
}

@After
Expand Down Expand Up @@ -64,7 +72,7 @@ private static void assertDefaultElserConfig(Map<String, Object> modelConfig) {
assertThat(
modelConfig.toString(),
adaptiveAllocations,
Matchers.is(Map.of("enabled", true, "min_number_of_allocations", 0, "max_number_of_allocations", 8))
Matchers.is(Map.of("enabled", true, "min_number_of_allocations", 0, "max_number_of_allocations", 32))
);
}

Expand Down Expand Up @@ -99,7 +107,7 @@ private static void assertDefaultE5Config(Map<String, Object> modelConfig) {
assertThat(
modelConfig.toString(),
adaptiveAllocations,
Matchers.is(Map.of("enabled", true, "min_number_of_allocations", 0, "max_number_of_allocations", 8))
Matchers.is(Map.of("enabled", true, "min_number_of_allocations", 0, "max_number_of_allocations", 32))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.ml.packageloader.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
Expand Down Expand Up @@ -56,10 +58,12 @@
*/
final class ModelLoaderUtils {

private static final Logger logger = LogManager.getLogger(ModelLoaderUtils.class);

public static String METADATA_FILE_EXTENSION = ".metadata.json";
public static String MODEL_FILE_EXTENSION = ".pt";

private static ByteSizeValue VOCABULARY_SIZE_LIMIT = new ByteSizeValue(20, ByteSizeUnit.MB);
private static final ByteSizeValue VOCABULARY_SIZE_LIMIT = new ByteSizeValue(20, ByteSizeUnit.MB);
private static final String VOCABULARY = "vocabulary";
private static final String MERGES = "merges";
private static final String SCORES = "scores";
Expand All @@ -83,6 +87,7 @@ record BytesAndPartIndex(BytesArray bytes, int partIndex) {}
private final AtomicInteger currentPart;
private final int lastPartNumber;
private final byte[] buf;
private final RequestRange range; // TODO debug only

HttpStreamChunker(URI uri, RequestRange range, int chunkSize) {
var inputStream = getHttpOrHttpsInputStream(uri, range);
Expand All @@ -91,6 +96,7 @@ record BytesAndPartIndex(BytesArray bytes, int partIndex) {}
this.lastPartNumber = range.startPart() + range.numParts();
this.currentPart = new AtomicInteger(range.startPart());
this.buf = new byte[chunkSize];
this.range = range;
}

// This ctor exists for testing purposes only.
Expand All @@ -100,6 +106,7 @@ record BytesAndPartIndex(BytesArray bytes, int partIndex) {}
this.lastPartNumber = range.startPart() + range.numParts();
this.currentPart = new AtomicInteger(range.startPart());
this.buf = new byte[chunkSize];
this.range = range;
}

public boolean hasNext() {
Expand All @@ -113,6 +120,7 @@ public BytesAndPartIndex next() throws IOException {
int read = inputStream.read(buf, bytesRead, chunkSize - bytesRead);
// EOF??
if (read == -1) {
logger.debug("end of stream, " + bytesRead + " bytes read");
break;
}
bytesRead += read;
Expand All @@ -122,6 +130,7 @@ public BytesAndPartIndex next() throws IOException {
totalBytesRead.addAndGet(bytesRead);
return new BytesAndPartIndex(new BytesArray(buf, 0, bytesRead), currentPart.getAndIncrement());
} else {
logger.warn("Empty part in range " + range + ", current part=" + currentPart.get() + ", last part=" + lastPartNumber);
return new BytesAndPartIndex(BytesArray.EMPTY, currentPart.get());
}
}
Expand Down

0 comments on commit 744009c

Please sign in to comment.