Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zen2: Add node id to log output of CoordinatorTests #33929

Merged
merged 4 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,16 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app
: "[" + currentPublication.get() + "] in progress, cannot start [" + publication + ']';
currentPublication = Optional.of(publication);

transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, () -> {
synchronized (mutex) {
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
@Override
public void run() {
publication.onTimeout();
}

@Override
public String toString() {
return "scheduled timeout for " + publication;
}
});
publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here
}
Expand Down Expand Up @@ -625,15 +631,23 @@ protected void onFoundPeersUpdated() {
if (foundQuorum) {
if (electionScheduler == null) {
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, () -> {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
}
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}

@Override
public String toString() {
return "scheduling of new prevoting round";
}
});
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,22 @@ protected synchronized void done() {

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try {
executorService.submit(() -> {
try {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public String toString() {
return "ListenableFuture notification";
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public abstract class TransportResponse extends TransportMessage {

public static class Empty extends TransportResponse {
public static final Empty INSTANCE = new Empty();

@Override
public String toString() {
return "Empty{}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -38,13 +39,8 @@
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;

Expand All @@ -55,7 +51,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -184,15 +179,15 @@ class ClusterNode extends AbstractComponent {
private final PersistedState persistedState;
private MasterService masterService;
private TransportService transportService;
private MockTransport mockTransport;
private DisruptableMockTransport mockTransport;

ClusterNode(int nodeIndex) {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode();
persistedState = new InMemoryPersistedState(1L,
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
setUp();
onNode(localNode, this::setUp).run();
}

private DiscoveryNode createDiscoveryNode() {
Expand All @@ -206,112 +201,44 @@ private DiscoveryNode createDiscoveryNode() {
}

private void setUp() {
mockTransport = new MockTransport() {
mockTransport = new DisruptableMockTransport(logger) {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself";
super.onSendRequest(requestId, action, request, destination);
protected DiscoveryNode getLocalNode() {
return localNode;
}

@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return ConnectionStatus.CONNECTED;
}

// connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later
final Consumer<Runnable> scheduler;
@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action) {
final Predicate<ClusterNode> matchesDestination;
if (action.equals(HANDSHAKE_ACTION_NAME)) {
scheduler = Runnable::run;
matchesDestination = n -> n.getLocalNode().getAddress().equals(destination.getAddress());
matchesDestination = n -> n.getLocalNode().getAddress().equals(node.getAddress());
} else {
scheduler = deterministicTaskQueue::scheduleNow;
matchesDestination = n -> n.getLocalNode().equals(destination);
matchesDestination = n -> n.getLocalNode().equals(node);
}
return clusterNodes.stream().filter(matchesDestination).findAny().map(cn -> cn.mockTransport);
}

scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
clusterNodes.stream().filter(matchesDestination).findAny().ifPresent(
destinationNode -> {

final RequestHandlerRegistry requestHandler
= destinationNode.mockTransport.getRequestHandler(action);

final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}

@Override
public String getChannelType() {
return "coordinator-test-channel";
}

@Override
public void sendResponse(final TransportResponse response) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of response " + response
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleResponse(requestId, response);
}
});
}

@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) {
sendResponse(response);
}

@Override
public void sendResponse(Exception exception) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of error response " + exception.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, exception);
}
});
}
};

try {
processMessageReceived(request, requestHandler, transportChannel);
} catch (Exception e) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of processing error response " + e.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}

@Override
public void run() {
handleRemoteError(requestId, e);
}
});
}
}
);
}
});
@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
// handshake needs to run inline as the caller blockingly waits on the result
if (action.equals(HANDSHAKE_ACTION_NAME)) {
onNode(destination, doDelivery).run();
} else {
deterministicTaskQueue.scheduleNow(onNode(destination, doDelivery));
}
}
};

masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow);
masterService = new FakeThreadPoolMasterService("test",
runnable -> deterministicTaskQueue.scheduleNow(onNode(localNode, runnable)));
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet());
settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR,
a -> localNode, null, emptySet());
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
Expand Down Expand Up @@ -359,9 +286,20 @@ private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
}
}

@SuppressWarnings("unchecked")
private static void processMessageReceived(TransportRequest request, RequestHandlerRegistry requestHandler,
TransportChannel transportChannel) throws Exception {
requestHandler.processMessageReceived(request, transportChannel);
private static Runnable onNode(DiscoveryNode node, Runnable runnable) {
final String nodeId = "{" + node.getId() + "}{" + node.getEphemeralId() + "}";
return new Runnable() {
@Override
public void run() {
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", nodeId)) {
runnable.run();
}
}

@Override
public String toString() {
return nodeId + ": " + runnable.toString();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class DeterministicTaskQueue extends AbstractComponent {

Expand Down Expand Up @@ -182,6 +183,13 @@ public void advanceTime() {
* @return A <code>ExecutorService</code> that uses this task queue.
*/
public ExecutorService getExecutorService() {
return getExecutorService(Function.identity());
}

/**
* @return A <code>ExecutorService</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ExecutorService getExecutorService(Function<Runnable, Runnable> runnableWrapper) {
return new ExecutorService() {

@Override
Expand Down Expand Up @@ -246,7 +254,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti

@Override
public void execute(Runnable command) {
scheduleNow(command);
scheduleNow(runnableWrapper.apply(command));
}
};
}
Expand All @@ -255,6 +263,13 @@ public void execute(Runnable command) {
* @return A <code>ThreadPool</code> that uses this task queue.
*/
public ThreadPool getThreadPool() {
return getThreadPool(Function.identity());
}

/**
* @return A <code>ThreadPool</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
return new ThreadPool(settings) {

{
Expand Down Expand Up @@ -303,12 +318,12 @@ public ThreadPoolStats stats() {

@Override
public ExecutorService generic() {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
public ExecutorService executor(String name) {
return getExecutorService();
return getExecutorService(runnableWrapper);
}

@Override
Expand All @@ -318,7 +333,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
final int CANCELLED = 2;
final AtomicInteger taskState = new AtomicInteger(NOT_STARTED);

scheduleAt(currentTimeMillis + delay.millis(), new Runnable() {
scheduleAt(currentTimeMillis + delay.millis(), runnableWrapper.apply(new Runnable() {
@Override
public void run() {
if (taskState.compareAndSet(NOT_STARTED, STARTED)) {
Expand All @@ -330,7 +345,7 @@ public void run() {
public String toString() {
return command.toString();
}
});
}));

return new ScheduledFuture<Object>() {
@Override
Expand Down
Loading