Skip to content

Commit

Permalink
Add x-opaque-id to search slow logs (#31539)
Browse files Browse the repository at this point in the history
Add x-opaque-id to search slow logs only. Indexing slow log and audit
logs will be handled as separate PRs.

Relates #31521
  • Loading branch information
imotov committed Jun 25, 2018
1 parent 3814b85 commit 28d62cd
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.tasks.Task;

import java.io.Closeable;
import java.net.SocketAddress;
Expand Down Expand Up @@ -74,7 +75,7 @@ static Collection<String> returnHttpResponseBodies(Collection<FullHttpResponse>
static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses) {
List<String> list = new ArrayList<>(responses.size());
for (HttpResponse response : responses) {
list.add(response.headers().get("X-Opaque-Id"));
list.add(response.headers().get(Task.X_OPAQUE_ID));
}
return list;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;

Expand Down Expand Up @@ -369,7 +370,7 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of("X-Opaque-Id")
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.tasks.Task;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -174,6 +175,11 @@ public String toString() {
} else {
sb.append("source[], ");
}
if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) {
sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("], ");
} else {
sb.append("id[], ");
}
return sb.toString();
}
}
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -449,7 +450,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of("X-Opaque-Id")
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
*/
public class Task {

/**
* The request header to mark tasks with specific ids
*/
public static final String X_OPAQUE_ID = "X-Opaque-Id";

private final long id;

private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void testSearchTaskDescriptions() {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

Map<String, String> headers = new HashMap<>();
headers.put("X-Opaque-Id", "my_id");
headers.put(Task.X_OPAQUE_ID, "my_id");
headers.put("Foo-Header", "bar");
headers.put("Custom-Task-Header", "my_value");
assertSearchResponse(
Expand Down Expand Up @@ -404,7 +404,7 @@ public void testSearchTaskHeaderLimit() {
int maxSize = Math.toIntExact(SETTING_HTTP_MAX_HEADER_SIZE.getDefault(Settings.EMPTY).getBytes() / 2 + 1);

Map<String, String> headers = new HashMap<>();
headers.put("X-Opaque-Id", "my_id");
headers.put(Task.X_OPAQUE_ID, "my_id");
headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100));
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
Expand All @@ -415,7 +415,7 @@ public void testSearchTaskHeaderLimit() {

private void assertTaskHeaders(TaskInfo taskInfo) {
assertThat(taskInfo.getHeaders().keySet(), hasSize(2));
assertEquals("my_id", taskInfo.getHeaders().get("X-Opaque-Id"));
assertEquals("my_id", taskInfo.getHeaders().get(Task.X_OPAQUE_ID));
assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index;

import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -34,12 +35,15 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.TestSearchContext;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;

import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -166,10 +170,12 @@ public void testSlowLogSearchContextPrinterToLog() throws IOException {
SearchContext searchContext = createSearchContext(index);
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
searchContext.request().source(source);
searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id")));
SearchSlowLog.SlowLogSearchContextPrinter p = new SearchSlowLog.SlowLogSearchContextPrinter(searchContext, 10);
assertThat(p.toString(), startsWith("[foo][0]"));
// Makes sure that output doesn't contain any new lines
assertThat(p.toString(), not(containsString("\n")));
assertThat(p.toString(), endsWith("id[my_id], "));
}

public void testLevelSetting() {
Expand Down

0 comments on commit 28d62cd

Please sign in to comment.