Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EQL: Optimize string retention #66207

Merged
merged 2 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;

import java.util.List;
Expand All @@ -24,6 +23,7 @@ public class Criterion<Q extends QueryRequest> {
private final HitExtractor tiebreaker;

private final boolean descending;
private final int keySize;

public Criterion(int stage,
Q queryRequest,
Expand All @@ -38,6 +38,8 @@ public Criterion(int stage,
this.tiebreaker = tiebreaker;

this.descending = descending;

this.keySize = keys.size();
}

public int stage() {
Expand All @@ -52,20 +54,14 @@ public Q queryRequest() {
return queryRequest;
}

public int keySize() {
return keys.size();
}

public SequenceKey key(SearchHit hit) {
SequenceKey key;
if (keys.isEmpty()) {
key = SequenceKey.NONE;
} else {
Object[] docKeys = new Object[keys.size()];
for (int i = 0; i < docKeys.length; i++) {
public Object[] key(SearchHit hit) {
Object[] key = null;
if (keySize > 0) {
Object[] docKeys = new Object[keySize];
for (int i = 0; i < keySize; i++) {
docKeys[i] = keys.get(i).extract(hit);
}
key = new SequenceKey(docKeys);
key = docKeys;
}
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ public class HitReference {

private final String index;
private final String id;


public HitReference(String index, String id) {
this.index = index;
this.id = id;
}

public HitReference(SearchHit hit) {
this.index = hit.getIndex();
this.id = hit.getId();
this(hit.getIndex(), hit.getId());
}

public String index() {
return index;
}
Expand All @@ -38,11 +42,11 @@ public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

HitReference other = (HitReference) obj;
return Objects.equals(index, other.index)
&& Objects.equals(id, other.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class SequenceKey {
private final Object[] keys;
private final int hashCode;

public SequenceKey(Object... keys) {
SequenceKey(Object... keys) {
this.keys = keys;
this.hashCode = Objects.hash(keys);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.elasticsearch.xpack.ql.util.ActionListeners;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHits;
Expand All @@ -47,8 +49,23 @@
*/
public class TumblingWindow implements Executable {

private static final int CACHE_MAX_SIZE = 64;

private final Logger log = LogManager.getLogger(TumblingWindow.class);

/**
* Simple cache for removing duplicate strings (such as index name or common keys).
* Designed to be low-effort, non-concurrent (not needed) and thus optimistic in nature.
* Thus it has a small, upper limit so that it doesn't require any cleaning up.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have a Cache class in the common lib, which is more complex and supports concurrency, could we have a comment here that concurrency is not needed?

*/
// start with the default size and allow growth until the max size
private final Map<String, String> stringCache = new LinkedHashMap<>(16, 0.75f, true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One things to consider: Start with the CACHE_MAX_SIZE instead of the default size (16). The HashMap will have to grow anyways, unless you think that the chances of having <= 32 different strings are high.

@Override
protected boolean removeEldestEntry(Map.Entry<String, String> eldest) {
return this.size() >= CACHE_MAX_SIZE;
}
};

private final QueryClient client;
private final List<Criterion<BoxedQueryRequest>> criteria;
private final Criterion<BoxedQueryRequest> until;
Expand Down Expand Up @@ -522,6 +539,28 @@ private TimeValue timeTook() {
return new TimeValue(System.currentTimeMillis() - startTime);
}

private String cache(String string) {
String value = stringCache.putIfAbsent(string, string);
return value == null ? string : value;
}

private SequenceKey key(Object[] keys) {
SequenceKey key;
if (keys == null) {
key = SequenceKey.NONE;
} else {
for (int i = 0; i < keys.length; i++) {
Object o = keys[i];
if (o instanceof String) {
keys[i] = cache((String) o);
}
}
key = new SequenceKey(keys);
}

return key;
}

private static Ordinal headOrdinal(List<SearchHit> hits, Criterion<BoxedQueryRequest> criterion) {
return criterion.ordinal(hits.get(0));
}
Expand Down Expand Up @@ -565,9 +604,9 @@ public boolean hasNext() {
@Override
public Tuple<KeyAndOrdinal, HitReference> next() {
SearchHit hit = delegate.next();
SequenceKey k = criterion.key(hit);
SequenceKey k = key(criterion.key(hit));
Ordinal o = criterion.ordinal(hit);
return new Tuple<>(new KeyAndOrdinal(k, o), new HitReference(hit));
return new Tuple<>(new KeyAndOrdinal(k, o), new HitReference(cache(hit.getIndex()), hit.getId()));
}
};
};
Expand Down