Skip to content

Commit

Permalink
Add connection accounting tests (#85966)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Apr 18, 2022
1 parent c2e488c commit a3cbf12
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
9 changes: 9 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ public Releasable registerChildConnection(long taskId, Transport.Connection chil
return null;
}

// package private for testing
Integer childTasksPerConnection(long taskId, Transport.Connection childConnection) {
final CancellableTaskHolder holder = cancellableTasks.get(taskId);
if (holder != null) {
return holder.childTasksPerConnection.get(childConnection);
}
return null;
}

/**
* Stores the task failure
*/
Expand Down
84 changes: 84 additions & 0 deletions server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.TransportTasksActionTests;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -24,7 +25,10 @@
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.TestTransportChannels;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -230,6 +234,32 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0));
}

public void testTaskAccounting() {
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of());

final Task task1 = taskManager.register("transport", "test", new CancellableRequest("thread 1"));
final Task task2 = taskManager.register("transport", "test", new CancellableRequest("thread 2"));

final MockConnection connection1 = new MockConnection();
final MockConnection connection2 = new MockConnection();

Releasable releasableConnection1 = taskManager.registerChildConnection(task1.getId(), connection1);
Releasable releasableConnection2 = taskManager.registerChildConnection(task2.getId(), connection2);
Releasable releasableConnection3 = taskManager.registerChildConnection(task1.getId(), connection1);

assertEquals(2, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue());
assertEquals(1, taskManager.childTasksPerConnection(task2.getId(), connection2).intValue());

releasableConnection1.close();
assertEquals(1, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue());

releasableConnection2.close();
assertNull(taskManager.childTasksPerConnection(task2.getId(), connection2));

releasableConnection3.close();
assertNull(taskManager.childTasksPerConnection(task1.getId(), connection1));
}

static class CancellableRequest extends TransportRequest {
private final String requestId;

Expand Down Expand Up @@ -265,4 +295,58 @@ public void addCloseListener(ActionListener<Void> listener) {
super.addCloseListener(listener);
}
}

public static final class MockConnection implements Transport.Connection {
@Override
public DiscoveryNode getNode() {
return null;
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
throw new UnsupportedOperationException();
}

@Override
public void addCloseListener(ActionListener<Void> listener) {}

@Override
public void addRemovedListener(ActionListener<Void> listener) {}

@Override
public boolean isClosed() {
return false;
}

@Override
public void close() {
throw new UnsupportedOperationException();
}

@Override
public void onRemoved() {
throw new UnsupportedOperationException();
}

@Override
public void incRef() {}

@Override
public boolean tryIncRef() {
return true;
}

@Override
public boolean decRef() {
assert false : "shouldn't release a mock connection";
return false;
}

@Override
public boolean hasReferences() {
return true;
}
}

}

0 comments on commit a3cbf12

Please sign in to comment.