Skip to content

Commit

Permalink
SOLR-16505: Switch Recovery/Replication to Jetty HTTP2 (#2276)
Browse files Browse the repository at this point in the history
UpdateShardHandler
* switch "recoveryOnlyHttpClient" to Http2SolrClient
RecoveryStrategy:
* Use Http2SolrClient
* Simplify cancelation of prep recovery command
IndexFetcher:
* Use Http2SolrClient
* Ensure the entire stream is consumed to avoid a connection reset
Http2SolrClient:
* make HttpListenerFactory configurable (used for internal purposes)

---------

Co-authored-by: iamsanjay <[email protected]>
Co-authored-by: David Smiley <[email protected]>
  • Loading branch information
3 people authored May 9, 2024
1 parent be3539d commit e62cb1b
Show file tree
Hide file tree
Showing 10 changed files with 589 additions and 120 deletions.
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ Other Changes
---------------------
* SOLR-17248: Refactor ZK related SolrCli tools to separate SolrZkClient and CloudSolrClient instantiation/usage (Lamine Idjeraoui via Eric Pugh)

* SOLR-16505: Use Jetty HTTP2 for index replication and other "recovery" operations
(Sanjay Dutt, David Smiley)

================== 9.6.0 ==================
New Features
---------------------
Expand Down
45 changes: 17 additions & 28 deletions solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
Expand Down Expand Up @@ -124,7 +124,7 @@ public static interface RecoveryListener {
private int retries;
private boolean recoveringAfterStartup;
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
private volatile FutureTask<NamedList<Object>> prevSendPreRecoveryHttpUriRequest;
private final Replica.Type replicaType;

private CoreDescriptor coreDescriptor;
Expand Down Expand Up @@ -175,25 +175,18 @@ public final void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
this.recoveringAfterStartup = recoveringAfterStartup;
}

/** Builds a new HttpSolrClient for use in recovery. Caller must close */
private HttpSolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) {
// workaround for SOLR-13605: get the configured timeouts & set them directly
// (even though getRecoveryOnlyHttpClient() already has them set)
private Http2SolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) {
final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig();
return (new HttpSolrClient.Builder(baseUrl)
return new Http2SolrClient.Builder(baseUrl)
.withDefaultCollection(leaderCoreName)
.withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
.withSocketTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
.withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient()));
.withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient());
}

// make sure any threads stop retrying
@Override
public final void close() {
close = true;
if (prevSendPreRecoveryHttpUriRequest != null) {
prevSendPreRecoveryHttpUriRequest.abort();
}
cancelPrepRecoveryCmd();
log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
}

Expand Down Expand Up @@ -634,11 +627,7 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
.getCollection(cloudDesc.getCollectionName())
.getSlice(cloudDesc.getShardId());

try {
prevSendPreRecoveryHttpUriRequest.abort();
} catch (NullPointerException e) {
// okay
}
cancelPrepRecoveryCmd();

if (isClosed()) {
log.info("RecoveryStrategy has been closed");
Expand Down Expand Up @@ -894,7 +883,6 @@ public final boolean isClosed() {

private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
throws SolrServerException, IOException, InterruptedException, ExecutionException {

WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(zkController.getNodeName());
Expand All @@ -915,18 +903,19 @@ private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreNa
int readTimeout =
conflictWaitMs
+ Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "8000"));
try (HttpSolrClient client =
try (SolrClient client =
recoverySolrClientBuilder(
leaderBaseUrl,
null) // leader core omitted since client only used for 'admin' request
.withSocketTimeout(readTimeout, TimeUnit.MILLISECONDS)
.withIdleTimeout(readTimeout, TimeUnit.MILLISECONDS)
.build()) {
HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;

prevSendPreRecoveryHttpUriRequest = new FutureTask<>(() -> client.request(prepCmd));
log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);

mrr.future.get();
prevSendPreRecoveryHttpUriRequest.run();
}
}

private void cancelPrepRecoveryCmd() {
Optional.ofNullable(prevSendPreRecoveryHttpUriRequest).ifPresent(req -> req.cancel(true));
}
}
118 changes: 50 additions & 68 deletions solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream;
import org.apache.http.client.HttpClient;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
Expand All @@ -97,9 +96,9 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
Expand Down Expand Up @@ -128,12 +127,12 @@
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.security.AllowListUrlChecker;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.IndexOutputOutputStream;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.stats.InstrumentedHttpRequestExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -186,7 +185,7 @@ public class IndexFetcher {

boolean fetchFromLeader = false;

private final HttpClient myHttpClient;
private final SolrClient solrClient;

private Integer connTimeout;

Expand Down Expand Up @@ -261,22 +260,22 @@ public String getMessage() {
}
}

private static HttpClient createHttpClient(
SolrCore core,
String httpBasicAuthUser,
String httpBasicAuthPassword,
boolean useCompression) {
final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
// no metrics, just tracing
InstrumentedHttpRequestExecutor executor = new InstrumentedHttpRequestExecutor(null);
return HttpClientUtil.createClient(
httpClientParams,
core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyConnectionManager(),
true,
executor);
// It's crucial not to remove the authentication credentials as they are essential for User
// managed replication.
// GitHub PR #2276
private SolrClient createSolrClient(
SolrCore core, String httpBasicAuthUser, String httpBasicAuthPassword, String leaderBaseUrl) {
final UpdateShardHandler updateShardHandler = core.getCoreContainer().getUpdateShardHandler();
Http2SolrClient httpClient =
new Http2SolrClient.Builder(leaderBaseUrl)
.withHttpClient(updateShardHandler.getRecoveryOnlyHttpClient())
.withListenerFactory(
updateShardHandler.getRecoveryOnlyHttpClient().getListenerFactory())
.withBasicAuthCredentials(httpBasicAuthUser, httpBasicAuthPassword)
.withIdleTimeout(soTimeout, TimeUnit.MILLISECONDS)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.build();
return httpClient;
}

public IndexFetcher(
Expand Down Expand Up @@ -318,12 +317,10 @@ public IndexFetcher(
if (soTimeout == -1) {
soTimeout = getParameter(initArgs, HttpClientUtil.PROP_SO_TIMEOUT, 120000, null);
}

String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
myHttpClient =
createHttpClient(
solrCore, httpBasicAuthUser, httpBasicAuthPassword, useExternalCompression);
solrClient =
createSolrClient(solrCore, httpBasicAuthUser, httpBasicAuthPassword, leaderBaseUrl);
}

private void setLeaderCoreUrl(String leaderCoreUrl) {
Expand Down Expand Up @@ -381,16 +378,10 @@ public NamedList<Object> getLatestVersion() throws IOException {
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);

req.setBasePath(leaderBaseUrl);
// TODO modify to use shardhandler
try (SolrClient client =
new Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {

return client.request(req, leaderCoreName);
try {
return solrClient.request(req, leaderCoreName);
} catch (SolrServerException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
}
Expand All @@ -408,15 +399,10 @@ private void fetchFileList(long gen) throws IOException {
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);

req.setBasePath(leaderBaseUrl);
// TODO modify to use shardhandler
try (SolrClient client =
new HttpSolrClient.Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {
NamedList<?> response = client.request(req, leaderCoreName);
try {
NamedList<?> response = solrClient.request(req, leaderCoreName);

List<Map<String, Object>> files = (List<Map<String, Object>>) response.get(CMD_GET_FILE_LIST);
if (files != null) filesToDownload = Collections.synchronizedList(files);
Expand Down Expand Up @@ -1805,10 +1791,10 @@ public void fetchFile() throws Exception {
private void fetch() throws Exception {
try {
while (true) {
int result;
try (FastInputStream is = getStream()) {
try (FastInputStream fis = getStream()) {
int result;
// fetch packets one by one in a single request
result = fetchPackets(is);
result = fetchPackets(fis);
if (result == 0 || result == NO_CONTENT) {
return;
}
Expand All @@ -1834,18 +1820,25 @@ private int fetchPackets(FastInputStream fis) throws Exception {
byte[] longbytes = new byte[8];
try {
while (true) {
if (fis.peek() == -1) {
if (bytesDownloaded == 0) {
log.warn("No content received for file: {}", fileName);
return NO_CONTENT;
}
return 0;
}
if (stop) {
stop = false;
aborted = true;
throw new ReplicationHandlerException("User aborted replication");
}
long checkSumServer = -1;

fis.readFully(intbytes);
// read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
log.warn("No content received for file: {}", fileName);
return NO_CONTENT;
continue;
}
// TODO consider recoding the remaining logic to not use/need buf[]; instead use the
// internal buffer of fis
Expand Down Expand Up @@ -1879,7 +1872,6 @@ private int fetchPackets(FastInputStream fis) throws Exception {
log.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName);
// errorCount is always set to zero after a successful packet
errorCount = 0;
if (bytesDownloaded >= size) return 0;
}
} catch (ReplicationHandlerException e) {
throw e;
Expand Down Expand Up @@ -1968,7 +1960,7 @@ private void cleanup() {
private FastInputStream getStream() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();

// //the method is command=filecontent
// the method is command=filecontent
params.set(COMMAND, CMD_GET_FILE);
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, ReplicationHandler.PATH);
Expand All @@ -1991,17 +1983,13 @@ private FastInputStream getStream() throws IOException {

NamedList<?> response;
InputStream is = null;

// TODO use shardhandler
try (SolrClient client =
new Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withResponseParser(null)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {
try {
QueryRequest req = new QueryRequest(params);
response = client.request(req, leaderCoreName);
req.setResponseParser(new InputStreamResponseParser(FILE_STREAM));
req.setBasePath(leaderBaseUrl);
if (useExternalCompression) req.addHeader("Accept-Encoding", "gzip");
response = solrClient.request(req, leaderCoreName);
is = (InputStream) response.get("stream");
if (useInternalCompression) {
is = new InflaterInputStream(is);
Expand Down Expand Up @@ -2125,21 +2113,15 @@ NamedList<Object> getDetails() throws IOException, SolrServerException {
params.set("follower", false);
params.set(CommonParams.QT, ReplicationHandler.PATH);

QueryRequest request = new QueryRequest(params);
request.setBasePath(leaderBaseUrl);
// TODO use shardhandler
try (SolrClient client =
new HttpSolrClient.Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {
QueryRequest request = new QueryRequest(params);
return client.request(request, leaderCoreName);
}
return solrClient.request(request, leaderCoreName);
}

public void destroy() {
abortFetch();
HttpClientUtil.close(myHttpClient);
IOUtils.closeQuietly(solrClient);
}

String getLeaderCoreUrl() {
Expand Down
Loading

0 comments on commit e62cb1b

Please sign in to comment.