Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST] Add connection accounting tests #85966

Merged
merged 1 commit into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ public Releasable registerChildConnection(long taskId, Transport.Connection chil
return null;
}

// package private for testing
Integer childTasksPerConnection(long taskId, Transport.Connection childConnection) {
final CancellableTaskHolder holder = cancellableTasks.get(taskId);
if (holder != null) {
return holder.childTasksPerConnection.get(childConnection);
}
return null;
}

/**
* Stores the task failure
*/
Expand Down
84 changes: 84 additions & 0 deletions server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.TransportTasksActionTests;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -24,7 +25,10 @@
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.TestTransportChannels;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -230,6 +234,32 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0));
}

public void testTaskAccounting() {
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of());

final Task task1 = taskManager.register("transport", "test", new CancellableRequest("thread 1"));
final Task task2 = taskManager.register("transport", "test", new CancellableRequest("thread 2"));

final MockConnection connection1 = new MockConnection();
final MockConnection connection2 = new MockConnection();

Releasable releasableConnection1 = taskManager.registerChildConnection(task1.getId(), connection1);
Releasable releasableConnection2 = taskManager.registerChildConnection(task2.getId(), connection2);
Releasable releasableConnection3 = taskManager.registerChildConnection(task1.getId(), connection1);

assertEquals(2, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue());
assertEquals(1, taskManager.childTasksPerConnection(task2.getId(), connection2).intValue());

releasableConnection1.close();
assertEquals(1, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue());

releasableConnection2.close();
assertNull(taskManager.childTasksPerConnection(task2.getId(), connection2));

releasableConnection3.close();
assertNull(taskManager.childTasksPerConnection(task1.getId(), connection1));
}

static class CancellableRequest extends TransportRequest {
private final String requestId;

Expand Down Expand Up @@ -265,4 +295,58 @@ public void addCloseListener(ActionListener<Void> listener) {
super.addCloseListener(listener);
}
}

public static final class MockConnection implements Transport.Connection {
@Override
public DiscoveryNode getNode() {
return null;
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
throw new UnsupportedOperationException();
}

@Override
public void addCloseListener(ActionListener<Void> listener) {}

@Override
public void addRemovedListener(ActionListener<Void> listener) {}

@Override
public boolean isClosed() {
return false;
}

@Override
public void close() {
throw new UnsupportedOperationException();
}

@Override
public void onRemoved() {
throw new UnsupportedOperationException();
}

@Override
public void incRef() {}

@Override
public boolean tryIncRef() {
return true;
}

@Override
public boolean decRef() {
assert false : "shouldn't release a mock connection";
return false;
}

@Override
public boolean hasReferences() {
return true;
}
}

}