Skip to content

Commit

Permalink
Merge branch 'main' into mapexpression-function-arg
Browse files Browse the repository at this point in the history
  • Loading branch information
fang-xing-esql committed Dec 23, 2024
2 parents e12c2f1 + edb3818 commit d1e122d
Show file tree
Hide file tree
Showing 47 changed files with 1,280 additions and 423 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/118016.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 118016
summary: Propagate status codes from shard failures appropriately
area: Search
type: enhancement
issues:
- 118482
6 changes: 6 additions & 0 deletions docs/changelog/118870.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 118870
summary: Rewrite TO_UPPER/TO_LOWER comparisons
area: ES|QL
type: enhancement
issues:
- 118304
6 changes: 6 additions & 0 deletions docs/changelog/118999.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 118999
summary: Fix loss of context in the inference API for streaming APIs
area: Machine Learning
type: bug
issues:
- 119000
5 changes: 5 additions & 0 deletions docs/changelog/119131.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119131
summary: Expose BwC enrich cache setting in plugin
area: Ingest Node
type: bug
issues: []
3 changes: 3 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ tests:
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
method: testFailureLoadingFields
issue: https://github.com/elastic/elasticsearch/issues/118000
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source}
issue: https://github.com/elastic/elasticsearch/issues/119191

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,21 @@ public RestStatus status() {
// on coordinator node. so get the status from cause instead of returning SERVICE_UNAVAILABLE blindly
return getCause() == null ? RestStatus.SERVICE_UNAVAILABLE : ExceptionsHelper.status(getCause());
}
RestStatus status = shardFailures[0].status();
if (shardFailures.length > 1) {
for (int i = 1; i < shardFailures.length; i++) {
if (shardFailures[i].status().getStatus() >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
status = shardFailures[i].status();
}
RestStatus status = null;
for (ShardSearchFailure shardFailure : shardFailures) {
RestStatus shardStatus = shardFailure.status();
int statusCode = shardStatus.getStatus();

// Return if it's an error that can be retried.
// These currently take precedence over other status code(s).
if (statusCode >= 502 && statusCode <= 504) {
return shardStatus;
} else if (statusCode >= 500) {
status = shardStatus;
}
}
return status;

return status == null ? shardFailures[0].status() : status;
}

public ShardSearchFailure[] shardFailures() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentParser;
Expand All @@ -31,6 +32,7 @@

import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class SearchPhaseExecutionExceptionTests extends ESTestCase {

Expand Down Expand Up @@ -168,4 +170,57 @@ public void testPhaseFailureWithSearchShardFailure() {

assertEquals(actual.status(), RestStatus.BAD_REQUEST);
}

public void testOnlyWithCodesThatDoNotRequirePrecedence() {
int pickedIndex = randomIntBetween(0, 1);

// Pick one of these exceptions randomly.
var searchExceptions = new ElasticsearchException[] {
new ElasticsearchException("simulated"),
new NodeDisconnectedException(null, "unused message", "unused action", null) };

// Status codes that map to searchExceptions.
var expectedStatusCodes = new RestStatus[] { RestStatus.INTERNAL_SERVER_ERROR, RestStatus.BAD_GATEWAY };

ShardSearchFailure shardFailure1 = new ShardSearchFailure(
searchExceptions[pickedIndex],
new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 1), null)
);

ShardSearchFailure shardFailure2 = new ShardSearchFailure(
searchExceptions[pickedIndex],
new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 2), null)
);

SearchPhaseExecutionException ex = new SearchPhaseExecutionException(
"search",
"all shards failed",
new ShardSearchFailure[] { shardFailure1, shardFailure2 }
);

assertThat(ex.status(), is(expectedStatusCodes[pickedIndex]));
}

public void testWithRetriableCodesThatTakePrecedence() {
// Maps to a 500.
ShardSearchFailure shardFailure1 = new ShardSearchFailure(
new ElasticsearchException("simulated"),
new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 1), null)
);

// Maps to a 502.
ShardSearchFailure shardFailure2 = new ShardSearchFailure(
new NodeDisconnectedException(null, "unused message", "unused action", null),
new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 2), null)
);

SearchPhaseExecutionException ex = new SearchPhaseExecutionException(
"search",
"all shards failed",
new ShardSearchFailure[] { shardFailure1, shardFailure2 }
);

// The 502 takes precedence over 500.
assertThat(ex.status(), is(RestStatus.BAD_GATEWAY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ public List<Setting<?>> getSettings() {
COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST,
COORDINATOR_PROXY_QUEUE_CAPACITY,
ENRICH_MAX_FORCE_MERGE_ATTEMPTS,
CACHE_SIZE
CACHE_SIZE,
CACHE_SIZE_BWC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;

import java.util.regex.Pattern;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
Expand All @@ -26,6 +28,8 @@
public final class TestUtils {
private TestUtils() {}

private static final Pattern WS_PATTERN = Pattern.compile("\\s");

public static Literal of(Object value) {
return of(Source.EMPTY, value);
}
Expand Down Expand Up @@ -59,4 +63,9 @@ public static FieldAttribute getFieldAttribute(String name) {
public static FieldAttribute getFieldAttribute(String name, DataType dataType) {
return new FieldAttribute(EMPTY, name, new EsField(name + "f", dataType, emptyMap(), true));
}

/** Similar to {@link String#strip()}, but removes the WS throughout the entire string. */
public static String stripThrough(String input) {
return WS_PATTERN.matcher(input).replaceAll(StringUtils.EMPTY);
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,20 @@ $endif$
$if(long||double)$
values = new LongLongHash(1, bigArrays);
$elseif(BytesRef)$
values = new LongLongHash(1, bigArrays);
bytes = new BytesRefHash(1, bigArrays);
LongLongHash _values = null;
BytesRefHash _bytes = null;
try {
_values = new LongLongHash(1, bigArrays);
_bytes = new BytesRefHash(1, bigArrays);

values = _values;
bytes = _bytes;

_values = null;
_bytes = null;
} finally {
Releasables.closeExpectNoException(_values, _bytes);
}
$elseif(int||float)$
values = new LongHash(1, bigArrays);
$endif$
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.operator.SequenceBytesRefBlockSourceOperator;
import org.elasticsearch.compute.operator.SourceOperator;

import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsInAnyOrder;

public class ValuesBytesRefAggregatorFunctionTests extends AggregatorFunctionTestCase {
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new SequenceBytesRefBlockSourceOperator(
blockFactory,
IntStream.range(0, size).mapToObj(l -> new BytesRef(randomAlphaOfLengthBetween(0, 100)))
);
}

@Override
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels);
}

@Override
protected String expectedDescriptionOfAggregator() {
return "values of bytes";
}

@Override
public void assertSimpleOutput(List<Block> input, Block result) {
TreeSet<?> set = new TreeSet<>((List<?>) BlockUtils.toJavaObject(result, 0));
Object[] values = input.stream()
.flatMap(AggregatorFunctionTestCase::allBytesRefs)
.collect(Collectors.toSet())
.toArray(Object[]::new);
if (false == set.containsAll(Arrays.asList(values))) {
assertThat(set, containsInAnyOrder(values));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.LongBytesRefTupleBlockSourceOperator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Tuple;

import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class ValuesBytesRefGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
@Override
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels);
}

@Override
protected String expectedDescriptionOfAggregator() {
return "values of bytes";
}

@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new LongBytesRefTupleBlockSourceOperator(
blockFactory,
IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), new BytesRef(randomAlphaOfLengthBetween(0, 100))))
);
}

@Override
public void assertSimpleGroup(List<Page> input, Block result, int position, Long group) {
Object[] values = input.stream().flatMap(p -> allBytesRefs(p, group)).collect(Collectors.toSet()).toArray(Object[]::new);
Object resultValue = BlockUtils.toJavaObject(result, position);
switch (values.length) {
case 0 -> assertThat(resultValue, nullValue());
case 1 -> assertThat(resultValue, equalTo(values[0]));
default -> {
TreeSet<?> set = new TreeSet<>((List<?>) resultValue);
if (false == set.containsAll(Arrays.asList(values))) {
assertThat(set, containsInAnyOrder(values));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,31 @@ public final void testSimpleWithCranky() {

DriverContext driverContext = crankyDriverContext();

Exception exception = null;
boolean driverStarted = false;
try {
Operator operator = simple().get(driverContext);
driverStarted = true;
drive(operator, input.iterator(), driverContext);
// Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws
} catch (CircuitBreakingException e) {
logger.info("broken", e);
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
exception = e;
}
if (driverStarted == false) {
// if drive hasn't even started then we need to release the input pages
Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks)));
}

// Note the lack of try/finally here - we're asserting that when the driver throws an exception we clear the breakers.
assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L));
long inputUsedBytes = inputFactoryContext.breaker().getUsed();
if (inputUsedBytes != 0L) {
fail(exception, "Expected no used bytes for input, found: " + inputUsedBytes);
}
long driverUsedBytes = driverContext.breaker().getUsed();
if (driverUsedBytes != 0L) {
fail(exception, "Expected no used bytes for driver, found: " + driverUsedBytes);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;

import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V8;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V9;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.ASYNC;

public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase {
Expand Down Expand Up @@ -96,7 +96,7 @@ protected boolean supportsInferenceTestService() {

@Override
protected boolean supportsIndexModeLookup() throws IOException {
return hasCapabilities(List.of(JOIN_LOOKUP_V8.capabilityName()));
return hasCapabilities(List.of(JOIN_LOOKUP_V9.capabilityName()));
}

@Override
Expand Down
Loading

0 comments on commit d1e122d

Please sign in to comment.