Skip to content

Commit

Permalink
SOLR-17569: fix flaky test TestLBHttpSolrClient (9.x) / LBHttp2SolrCl…
Browse files Browse the repository at this point in the history
…ientIntegrationTest (main) (#2884)
  • Loading branch information
jdyer1 authored Nov 26, 2024
1 parent 65bf79b commit 656397c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.ref.WeakReference;
import java.net.ConnectException;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -60,10 +61,18 @@
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.URLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class LBSolrClient extends SolrClient {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

protected static final String UPDATE_LIVE_SERVER_MESSAGE = "Updated alive server list";

private static final String UPDATE_LIVE_SERVER_LOG = UPDATE_LIVE_SERVER_MESSAGE + ": {}";

// defaults
protected static final Set<Integer> RETRY_CODES =
new HashSet<>(Arrays.asList(404, 403, 503, 500));
Expand Down Expand Up @@ -412,6 +421,9 @@ public LBSolrClient(List<Endpoint> solrEndpoints) {
protected void updateAliveList() {
synchronized (aliveServers) {
aliveServerList = aliveServers.values().toArray(new EndpointWrapper[0]);
if (log.isDebugEnabled()) {
log.debug(UPDATE_LIVE_SERVER_LOG, Arrays.toString(aliveServerList));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.SolrResponseBase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.embedded.JettyConfig;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.LogListener;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
Expand Down Expand Up @@ -171,10 +170,8 @@ public void testSimple() throws Exception {
assertEquals(2, names.size());
assertFalse(names.contains("solr1"));

// Start the killed server once again
solr[1].startJetty();
// Wait for the alive check to complete
Thread.sleep(1200);
startJettyAndWaitForAliveCheckQuery(solr[1]);

names.clear();
for (int i = 0; i < solr.length; i++) {
resp = h.lbClient.query(solrQuery);
Expand All @@ -198,59 +195,14 @@ public void testTwoServers() throws Exception {
resp = h.lbClient.query(solrQuery);
name = resp.getResults().get(0).getFieldValue("name").toString();
assertEquals("solr/collection11", name);
solr[1].jetty.stop();
solr[1].jetty = null;
solr[0].startJetty();
Thread.sleep(1200);
try {
resp = h.lbClient.query(solrQuery);
} catch (SolrServerException e) {
// try again after a pause in case the error is lack of time to start server
Thread.sleep(3000);
resp = h.lbClient.query(solrQuery);
}
name = resp.getResults().get(0).getFieldValue("name").toString();
assertEquals("solr/collection10", name);
}
}

public void testReliability() throws Exception {
final var baseSolrEndpoints = bootstrapBaseSolrEndpoints(solr.length);
try (var h = client(baseSolrEndpoints)) {

// Kill a server and test again
solr[1].jetty.stop();
solr[1].jetty = null;
startJettyAndWaitForAliveCheckQuery(solr[0]);

// query the servers
for (int i = 0; i < solr.length; i++) {
h.lbClient.query(new SolrQuery("*:*"));
}

// Start the killed server once again
solr[1].startJetty();
// Wait for the alive check to complete
waitForServer(30, h.lbClient, 3, solr[1].name);
}
}

// wait maximum ms for serverName to come back up
private void waitForServer(
int maxSeconds, LBHttp2SolrClient<?> client, int nServers, String serverName)
throws Exception {
final TimeOut timeout = new TimeOut(maxSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
QueryResponse resp;
try {
resp = client.query(new SolrQuery("*:*"));
} catch (Exception e) {
log.warn("", e);
continue;
}
String name = resp.getResults().get(0).getFieldValue("name").toString();
if (name.equals(serverName)) return;

Thread.sleep(500);
resp = h.lbClient.query(solrQuery);
name = resp.getResults().get(0).getFieldValue("name").toString();
assertEquals("solr/collection10", name);
}
}

Expand All @@ -262,6 +214,16 @@ private LBSolrClient.Endpoint[] bootstrapBaseSolrEndpoints(int max) {
return solrUrls;
}

private void startJettyAndWaitForAliveCheckQuery(SolrInstance solrInstance) throws Exception {
try (LogListener logListener =
LogListener.debug().substring(LBSolrClient.UPDATE_LIVE_SERVER_MESSAGE)) {
solrInstance.startJetty();
if (logListener.pollMessage(10, TimeUnit.SECONDS) == null) {
fail("The alive check query was not logged within 10 seconds.");
}
}
}

private static class SolrInstance {
String name;
File homeDir;
Expand Down
53 changes: 38 additions & 15 deletions solr/test-framework/src/java/org/apache/solr/util/LogListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.solr.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -44,9 +46,9 @@
import org.apache.solr.common.util.SuppressForbidden;

/**
* Helper code to listen for {@link LogEvent} messages (via a {@link Queue}) that you expect as a
* result of the things you are testing, So you can make assertions about when a particular action
* should/shouldn't cause Solr to produce a particular Log message
* Helper code to listen for {@link LogEvent} messages (via a {@link BlockingQueue}) that you expect
* as a result of the things you are testing, So you can make assertions about when a particular
* action should/shouldn't cause Solr to produce a particular Log message
*
* <p><code>
* // simplest possible usage...
Expand Down Expand Up @@ -278,7 +280,7 @@ public void close() {
*
* @see #getQueue
*/
public LogListener setQueue(Queue<LogEvent> queue) {
public LogListener setQueue(BlockingQueue<LogEvent> queue) {
loggerAppender.setQueue(queue);
return this;
}
Expand Down Expand Up @@ -346,21 +348,22 @@ private void setPredicate(Predicate<String> predicate) {
}

/**
* Direct access to the Queue of Log events that have been recorded, for {@link Queue#poll}ing
* messages or any other inspection/manipulation.
* Direct access to the Queue of Log events that have been recorded, for {@link
* BlockingQueue#poll}ing messages or any other inspection/manipulation.
*
* <p>If a Log event is ever processed but can not be added to this queue (because {@link
* Queue#offer} returns false) then the {@link #close} method of this listener will fail the test.
* BlockingQueue#offer} returns false) then the {@link #close} method of this listener will fail
* the test.
*
* @see #setQueue
* @see #pollMessage
*/
public Queue<LogEvent> getQueue() {
public BlockingQueue<LogEvent> getQueue() {
return loggerAppender.getQueue();
}

/**
* Convinience method for tests that want to assert things about the (formated) message string at
* Convenience method for tests that want to assert things about the (formated) message string at
* the head of the queue, w/o needing to know/call methods on the underlying {@link LogEvent}
* class.
*
Expand All @@ -373,6 +376,26 @@ public String pollMessage() {
return null == event ? null : event.getMessage().getFormattedMessage();
}

/**
* Convenience method for tests that want to assert things about the (formated) message string at
* the head of the queue, waiting up to the specified timeout for the message to arrive.
*
* @param timeout the duation value
* @param unit the duration unit
* @return the formatted message string of head of the queue, or null if the queue remained empty
* until the specified timeout.
*/
public String pollMessage(long timeout, TimeUnit unit) {
LogEvent event = null;
try {
event = getQueue().poll(timeout, unit);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
fail("Our thread was interrupted while polling the queue.");
}
return null == event ? null : event.getMessage().getFormattedMessage();
}

/**
* The total number of Log events so far processed by this instance, regardless of wether they
* have already been removed from the queue, or if they could not be added to the queue due to
Expand Down Expand Up @@ -598,7 +621,7 @@ public Result filter(LogEvent event) {
private static final class QueueAppender extends AbstractAppender {

// may be mutated in main thread while background thread is actively logging
private final AtomicReference<Queue<LogEvent>> queue =
private final AtomicReference<BlockingQueue<LogEvent>> queue =
new AtomicReference<>(new ArrayBlockingQueue<>(100));
final AtomicInteger count = new AtomicInteger(0);
final AtomicInteger capacityExceeded = new AtomicInteger(0);
Expand All @@ -611,7 +634,7 @@ public QueueAppender(final String name) {

@Override
public void append(final LogEvent event) {
final Queue<LogEvent> q = queue.get(); // read from reference once
final BlockingQueue<LogEvent> q = queue.get(); // read from reference once
final LogEvent memento =
(event instanceof MutableLogEvent) ? ((MutableLogEvent) event).createMemento() : event;
final int currentCount = count.incrementAndGet();
Expand Down Expand Up @@ -648,20 +671,20 @@ public int getCount() {
* Returns the number of times this appender was unable to queue a LogEvent due to exceeding
* capacity
*
* @see Queue#offer
* @see BlockingQueue#offer
*/
public int getNumCapacityExceeded() {
return capacityExceeded.get();
}

/** Changes the queue that will be used for any future events that are appended */
public void setQueue(final Queue<LogEvent> q) {
public void setQueue(final BlockingQueue<LogEvent> q) {
assert null != q;
this.queue.set(q);
}

/** Returns Raw access to the (current) queue */
public Queue<LogEvent> getQueue() {
public BlockingQueue<LogEvent> getQueue() {
return queue.get();
}
}
Expand Down

0 comments on commit 656397c

Please sign in to comment.