Skip to content

Commit

Permalink
Merge branch 'main' into esql/multinode_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila committed Sep 6, 2023
2 parents f31fe30 + 75f0731 commit 3389bbf
Show file tree
Hide file tree
Showing 18 changed files with 823 additions and 207 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/98996.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98996
summary: Reintroduce `sparse_vector` mapping
area: Mapping
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/99188.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 99188
summary: "ESQL: skip synthetic attributes when planning the physical fragment"
area: ES|QL
type: bug
issues:
- 99170
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
---
"Indexing and searching sparse vectors":

- skip:
version: " - 8.10.99"
reason: "sparse_vector field type reintroduced in 8.11"

- do:
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
text:
type: text
ml.tokens:
type: sparse_vector

- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
body:
text: "running is good for you"
ml:
tokens:
running: 2.4097164
good: 2.170997
run: 2.052153
race: 1.4575411
for: 1.1908325
runner: 1.1803857
exercise: 1.1652642
you: 0.9654308
training: 0.94999343
sports: 0.93650943
fitness: 0.83129317
best: 0.820365
bad: 0.7385934
health: 0.7098149
marathon: 0.61555296
gym: 0.5652374

- match: { result: "created" }

- do:
index:
index: test
id: "2"
body:
text: "walking is a healthy exercise"
ml:
tokens:
walking: 2.4797723
exercise: 2.074234
healthy: 1.971596
walk: 1.6458614
health: 1.5291847
walker: 1.4736869
activity: 1.0793462
good: 1.0597849
fitness: 0.91855437
training: 0.86342937
movement: 0.7657065
normal: 0.6694081
foot: 0.5892523
physical: 0.4926789

- match: { result: "created" }

- do:
indices.refresh: { }

- do:
search:
index: test
body:
query:
bool:
should:
- term:
ml.tokens:
value: "walk"
boost: 1.9790847
- term:
ml.tokens:
value: "walking"
boost: 1.7092685
- term:
ml.tokens:
value: "exercise"
boost: 0.84076905

- match: { hits.total.value: 2 }
- match: { hits.hits.0._id: "2" }
- match: { hits.hits.1._id: "1" }

---
"Sparse vector in 7.x":
- skip:
features: allowed_warnings
version: "8.0.0 - "
reason: "sparse_vector field type supported in 7.x"
- do:
allowed_warnings:
- "The [sparse_vector] field type is deprecated and will be removed in 8.0."
- "[sparse_vector] field type in old 7.x indices is allowed to contain [sparse_vector] fields, but they cannot be indexed or searched."
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
text:
type: text
ml.tokens:
type: sparse_vector

- match: { acknowledged: true }

---
"Sparse vector in 8.x":
- skip:
version: " - 7.99.99, 8.11.0 - "
reason: "sparse_vector field type not supported in 8.x until 8.11.0"
- do:
catch: /The \[sparse_vector\] field type is no longer supported/
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
text:
type: text
ml.tokens:
type: sparse_vector
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
Expand All @@ -33,7 +34,7 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -179,7 +180,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
while (unassignedIterator.hasNext()) {
final ShardRouting shardRouting = unassignedIterator.next();
final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.NO_ATTEMPT) {
if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) {
unassignedIterator.updateUnassigned(
new UnassignedInfo(
unassignedInfo.getReason(),
Expand All @@ -189,7 +190,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(),
unassignedInfo.isDelayed(),
UnassignedInfo.AllocationStatus.DECIDERS_NO,
AllocationStatus.DECIDERS_NO,
unassignedInfo.getFailedNodeIds(),
unassignedInfo.getLastAllocatedNodeId()
),
Expand Down Expand Up @@ -249,69 +250,60 @@ private void allocateUnassigned() {
final var shard = primary[i];
final var assignment = desiredBalance.getAssignment(shard.shardId());
final boolean ignored = assignment == null || isIgnored(routingNodes, shard, assignment);
final var isThrottled = new AtomicBoolean(false);
if (ignored == false) {
for (final var nodeIdIterator : List.of(
getDesiredNodesIds(shard, assignment),
getFallbackNodeIds(shard, isThrottled)
)) {
for (final var desiredNodeId : nodeIdIterator) {
final var routingNode = routingNodes.node(desiredNodeId);
if (routingNode == null) {
// desired node no longer exists
continue;
}
final var decision = allocation.deciders().canAllocate(shard, routingNode, allocation);
switch (decision.type()) {
case YES -> {
logger.debug("Assigning shard [{}] to [{}]", shard, desiredNodeId);
final long shardSize = DiskThresholdDecider.getExpectedShardSize(
shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);
routingNodes.initializeShard(shard, desiredNodeId, null, shardSize, allocation.changes());
allocationOrdering.recordAllocation(desiredNodeId);
if (shard.primary() == false) {
// copy over the same replica shards to the secondary array so they will get allocated
// in a subsequent iteration, allowing replicas of other shards to be allocated first
while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) {
secondary[secondaryLength++] = primary[++i];
}
AllocationStatus unallocatedStatus;
if (ignored) {
unallocatedStatus = AllocationStatus.NO_ATTEMPT;
} else {
unallocatedStatus = AllocationStatus.DECIDERS_NO;
final var nodeIdsIterator = new NodeIdsIterator(shard, assignment);
while (nodeIdsIterator.hasNext()) {
final var nodeId = nodeIdsIterator.next();
final var routingNode = routingNodes.node(nodeId);
if (routingNode == null) {
// desired node no longer exists
continue;
}
final var decision = allocation.deciders().canAllocate(shard, routingNode, allocation);
switch (decision.type()) {
case YES -> {
logger.debug("Assigning shard [{}] to {} [{}]", shard, nodeIdsIterator.source, nodeId);
final long shardSize = DiskThresholdDecider.getExpectedShardSize(
shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);
routingNodes.initializeShard(shard, nodeId, null, shardSize, allocation.changes());
allocationOrdering.recordAllocation(nodeId);
if (shard.primary() == false) {
// copy over the same replica shards to the secondary array so they will get allocated
// in a subsequent iteration, allowing replicas of other shards to be allocated first
while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) {
secondary[secondaryLength++] = primary[++i];
}
continue nextShard;
}
case THROTTLE -> {
isThrottled.set(true);
logger.trace("Couldn't assign shard [{}] to [{}]: {}", shard.shardId(), desiredNodeId, decision);
}
case NO -> {
logger.trace("Couldn't assign shard [{}] to [{}]: {}", shard.shardId(), desiredNodeId, decision);
}
continue nextShard;
}
case THROTTLE -> {
nodeIdsIterator.wasThrottled = true;
unallocatedStatus = AllocationStatus.DECIDERS_THROTTLED;
logger.trace("Couldn't assign shard [{}] to [{}]: {}", shard.shardId(), nodeId, decision);
}
case NO -> {
logger.trace("Couldn't assign shard [{}] to [{}]: {}", shard.shardId(), nodeId, decision);
}
}
}
}

logger.debug("No eligible node found to assign shard [{}] amongst [{}]", shard, assignment);

final UnassignedInfo.AllocationStatus allocationStatus;
if (ignored) {
allocationStatus = UnassignedInfo.AllocationStatus.NO_ATTEMPT;
} else if (isThrottled.get()) {
allocationStatus = UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED;
} else {
allocationStatus = UnassignedInfo.AllocationStatus.DECIDERS_NO;
}

unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
logger.debug("No eligible node found to assign shard [{}]", shard);
unassigned.ignoreShard(shard, unallocatedStatus, allocation.changes());
if (shard.primary() == false) {
// we could not allocate it and we are a replica - check if we can ignore the other replicas
while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) {
unassigned.ignoreShard(primary[++i], allocationStatus, allocation.changes());
unassigned.ignoreShard(primary[++i], unallocatedStatus, allocation.changes());
}
}
}
Expand All @@ -323,6 +315,59 @@ private void allocateUnassigned() {
} while (primaryLength > 0);
}

private final class NodeIdsIterator implements Iterator<String> {

private final ShardRouting shard;

/**
* Contains the source of the nodeIds used for shard assignment. It could be:
* * desired - when using desired nodes
* * forced initial allocation - when initial allocation is forced to certain nodes by shrink/split/clone index operation
* * fallback - when assigning the primary shard is temporarily not possible on desired nodes,
* and it is assigned elsewhere in the cluster
*/
private NodeIdSource source;
private Iterator<String> nodeIds;

private boolean wasThrottled = false;

NodeIdsIterator(ShardRouting shard, ShardAssignment assignment) {
this.shard = shard;

var forcedInitialAllocation = allocation.deciders().getForcedInitialShardAllocationToNodes(shard, allocation);
if (forcedInitialAllocation.isPresent()) {
logger.debug("Shard [{}] initial allocation is forced to {}", shard.shardId(), forcedInitialAllocation.get());
nodeIds = allocationOrdering.sort(forcedInitialAllocation.get()).iterator();
source = NodeIdSource.FORCED_INITIAL_ALLOCATION;
} else {
nodeIds = allocationOrdering.sort(assignment.nodeIds()).iterator();
source = NodeIdSource.DESIRED;
}
}

@Override
public boolean hasNext() {
if (nodeIds.hasNext() == false && source != NodeIdSource.FALLBACK && shard.primary() && wasThrottled == false) {
var fallbackNodeIds = allocation.routingNodes().getAllNodeIds();
logger.debug("Shard [{}] assignment is temporarily not possible. Falling back to {}", shard.shardId(), fallbackNodeIds);
nodeIds = allocationOrdering.sort(fallbackNodeIds).iterator();
source = NodeIdSource.FALLBACK;
}
return nodeIds.hasNext();
}

@Override
public String next() {
return nodeIds.next();
}
}

private enum NodeIdSource {
DESIRED,
FORCED_INITIAL_ALLOCATION,
FALLBACK;
}

private Iterable<String> getDesiredNodesIds(ShardRouting shard, ShardAssignment assignment) {
return allocationOrdering.sort(allocation.deciders().getForcedInitialShardAllocationToNodes(shard, allocation).map(forced -> {
logger.debug("Shard [{}] assignment is ignored. Initial allocation forced to {}", shard.shardId(), forced);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private static IndexVersion registerIndexVersion(int id, Version luceneVersion,
* Detached index versions added below here.
*/
public static final IndexVersion V_8_500_000 = registerIndexVersion(8_500_000, Version.LUCENE_9_7_0, "bf656f5e-5808-4eee-bf8a-e2bf6736ff55");
public static final IndexVersion V_8_500_001 = registerIndexVersion(8_500_001, Version.LUCENE_9_7_0, "45045a5a-fc57-4462-89f6-6bc04cda6015");
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down Expand Up @@ -149,7 +150,7 @@ private static IndexVersion registerIndexVersion(int id, Version luceneVersion,
*/

private static class CurrentHolder {
private static final IndexVersion CURRENT = findCurrent(V_8_500_000);
private static final IndexVersion CURRENT = findCurrent(V_8_500_001);

// finds the pluggable current version, or uses the given fallback
private static IndexVersion findCurrent(IndexVersion fallback) {
Expand Down
Loading

0 comments on commit 3389bbf

Please sign in to comment.