diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 751e5a9fb08..a507c1026f2 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -130,6 +130,9 @@ Other Changes --------------------- * SOLR-17248: Refactor ZK related SolrCli tools to separate SolrZkClient and CloudSolrClient instantiation/usage (Lamine Idjeraoui via Eric Pugh) +* SOLR-16505: Use Jetty HTTP2 for index replication and other "recovery" operations + (Sanjay Dutt, David Smiley) + ================== 9.6.0 ================== New Features --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 9ea837378d7..d9453757dcc 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -22,17 +22,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; -import org.apache.http.client.methods.HttpUriRequest; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse; +import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -124,7 +124,7 @@ public static interface RecoveryListener { private int retries; private boolean recoveringAfterStartup; private CoreContainer cc; - private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest; + private volatile FutureTask> prevSendPreRecoveryHttpUriRequest; private final Replica.Type replicaType; private CoreDescriptor coreDescriptor; @@ -175,25 +175,18 @@ public final void setRecoveringAfterStartup(boolean recoveringAfterStartup) { this.recoveringAfterStartup = recoveringAfterStartup; } - /** Builds a new HttpSolrClient for use in recovery. Caller must close */ - private HttpSolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) { - // workaround for SOLR-13605: get the configured timeouts & set them directly - // (even though getRecoveryOnlyHttpClient() already has them set) + private Http2SolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) { final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig(); - return (new HttpSolrClient.Builder(baseUrl) + return new Http2SolrClient.Builder(baseUrl) .withDefaultCollection(leaderCoreName) - .withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS) - .withSocketTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS) - .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient())); + .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient()); } // make sure any threads stop retrying @Override public final void close() { close = true; - if (prevSendPreRecoveryHttpUriRequest != null) { - prevSendPreRecoveryHttpUriRequest.abort(); - } + cancelPrepRecoveryCmd(); log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName); } @@ -634,11 +627,7 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception { .getCollection(cloudDesc.getCollectionName()) .getSlice(cloudDesc.getShardId()); - try { - prevSendPreRecoveryHttpUriRequest.abort(); - } catch (NullPointerException e) { - // okay - } + cancelPrepRecoveryCmd(); if (isClosed()) { log.info("RecoveryStrategy has been closed"); @@ -894,7 +883,6 @@ public final boolean isClosed() { private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice) throws SolrServerException, IOException, InterruptedException, ExecutionException { - WaitForState prepCmd = new WaitForState(); prepCmd.setCoreName(leaderCoreName); prepCmd.setNodeName(zkController.getNodeName()); @@ -915,18 +903,19 @@ private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreNa int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "8000")); - try (HttpSolrClient client = + try (SolrClient client = recoverySolrClientBuilder( leaderBaseUrl, null) // leader core omitted since client only used for 'admin' request - .withSocketTimeout(readTimeout, TimeUnit.MILLISECONDS) + .withIdleTimeout(readTimeout, TimeUnit.MILLISECONDS) .build()) { - HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd); - prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest; - + prevSendPreRecoveryHttpUriRequest = new FutureTask<>(() -> client.request(prepCmd)); log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd); - - mrr.future.get(); + prevSendPreRecoveryHttpUriRequest.run(); } } + + private void cancelPrepRecoveryCmd() { + Optional.ofNullable(prevSendPreRecoveryHttpUriRequest).ifPresent(req -> req.cancel(true)); + } } diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index b347565450a..69a2ddda8bb 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -84,7 +84,6 @@ import java.util.zip.Adler32; import java.util.zip.Checksum; import java.util.zip.InflaterInputStream; -import org.apache.http.client.HttpClient; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; @@ -97,9 +96,9 @@ import org.apache.lucene.store.IndexOutput; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.impl.HttpClientUtil; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder; +import org.apache.solr.client.solrj.impl.InputStreamResponseParser; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.ZkController; @@ -128,12 +127,12 @@ import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.security.AllowListUrlChecker; import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.UpdateShardHandler; import org.apache.solr.util.FileUtils; import org.apache.solr.util.IndexOutputOutputStream; import org.apache.solr.util.RTimer; import org.apache.solr.util.RefCounted; import org.apache.solr.util.TestInjection; -import org.apache.solr.util.stats.InstrumentedHttpRequestExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,7 +185,7 @@ public class IndexFetcher { boolean fetchFromLeader = false; - private final HttpClient myHttpClient; + private final SolrClient solrClient; private Integer connTimeout; @@ -261,22 +260,22 @@ public String getMessage() { } } - private static HttpClient createHttpClient( - SolrCore core, - String httpBasicAuthUser, - String httpBasicAuthPassword, - boolean useCompression) { - final ModifiableSolrParams httpClientParams = new ModifiableSolrParams(); - httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser); - httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword); - httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression); - // no metrics, just tracing - InstrumentedHttpRequestExecutor executor = new InstrumentedHttpRequestExecutor(null); - return HttpClientUtil.createClient( - httpClientParams, - core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyConnectionManager(), - true, - executor); + // It's crucial not to remove the authentication credentials as they are essential for User + // managed replication. + // GitHub PR #2276 + private SolrClient createSolrClient( + SolrCore core, String httpBasicAuthUser, String httpBasicAuthPassword, String leaderBaseUrl) { + final UpdateShardHandler updateShardHandler = core.getCoreContainer().getUpdateShardHandler(); + Http2SolrClient httpClient = + new Http2SolrClient.Builder(leaderBaseUrl) + .withHttpClient(updateShardHandler.getRecoveryOnlyHttpClient()) + .withListenerFactory( + updateShardHandler.getRecoveryOnlyHttpClient().getListenerFactory()) + .withBasicAuthCredentials(httpBasicAuthUser, httpBasicAuthPassword) + .withIdleTimeout(soTimeout, TimeUnit.MILLISECONDS) + .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS) + .build(); + return httpClient; } public IndexFetcher( @@ -318,12 +317,10 @@ public IndexFetcher( if (soTimeout == -1) { soTimeout = getParameter(initArgs, HttpClientUtil.PROP_SO_TIMEOUT, 120000, null); } - String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER); String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS); - myHttpClient = - createHttpClient( - solrCore, httpBasicAuthUser, httpBasicAuthPassword, useExternalCompression); + solrClient = + createSolrClient(solrCore, httpBasicAuthUser, httpBasicAuthPassword, leaderBaseUrl); } private void setLeaderCoreUrl(String leaderCoreUrl) { @@ -381,16 +378,10 @@ public NamedList getLatestVersion() throws IOException { params.set(CommonParams.WT, JAVABIN); params.set(CommonParams.QT, ReplicationHandler.PATH); QueryRequest req = new QueryRequest(params); - + req.setBasePath(leaderBaseUrl); // TODO modify to use shardhandler - try (SolrClient client = - new Builder(leaderBaseUrl) - .withHttpClient(myHttpClient) - .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS) - .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS) - .build()) { - - return client.request(req, leaderCoreName); + try { + return solrClient.request(req, leaderCoreName); } catch (SolrServerException e) { throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e); } @@ -408,15 +399,10 @@ private void fetchFileList(long gen) throws IOException { params.set(CommonParams.WT, JAVABIN); params.set(CommonParams.QT, ReplicationHandler.PATH); QueryRequest req = new QueryRequest(params); - + req.setBasePath(leaderBaseUrl); // TODO modify to use shardhandler - try (SolrClient client = - new HttpSolrClient.Builder(leaderBaseUrl) - .withHttpClient(myHttpClient) - .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS) - .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS) - .build()) { - NamedList response = client.request(req, leaderCoreName); + try { + NamedList response = solrClient.request(req, leaderCoreName); List> files = (List>) response.get(CMD_GET_FILE_LIST); if (files != null) filesToDownload = Collections.synchronizedList(files); @@ -1805,10 +1791,10 @@ public void fetchFile() throws Exception { private void fetch() throws Exception { try { while (true) { - int result; - try (FastInputStream is = getStream()) { + try (FastInputStream fis = getStream()) { + int result; // fetch packets one by one in a single request - result = fetchPackets(is); + result = fetchPackets(fis); if (result == 0 || result == NO_CONTENT) { return; } @@ -1834,18 +1820,25 @@ private int fetchPackets(FastInputStream fis) throws Exception { byte[] longbytes = new byte[8]; try { while (true) { + if (fis.peek() == -1) { + if (bytesDownloaded == 0) { + log.warn("No content received for file: {}", fileName); + return NO_CONTENT; + } + return 0; + } if (stop) { stop = false; aborted = true; throw new ReplicationHandlerException("User aborted replication"); } long checkSumServer = -1; + fis.readFully(intbytes); // read the size of the packet int packetSize = readInt(intbytes); if (packetSize <= 0) { - log.warn("No content received for file: {}", fileName); - return NO_CONTENT; + continue; } // TODO consider recoding the remaining logic to not use/need buf[]; instead use the // internal buffer of fis @@ -1879,7 +1872,6 @@ private int fetchPackets(FastInputStream fis) throws Exception { log.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName); // errorCount is always set to zero after a successful packet errorCount = 0; - if (bytesDownloaded >= size) return 0; } } catch (ReplicationHandlerException e) { throw e; @@ -1968,7 +1960,7 @@ private void cleanup() { private FastInputStream getStream() throws IOException { ModifiableSolrParams params = new ModifiableSolrParams(); - // //the method is command=filecontent + // the method is command=filecontent params.set(COMMAND, CMD_GET_FILE); params.set(GENERATION, Long.toString(indexGen)); params.set(CommonParams.QT, ReplicationHandler.PATH); @@ -1991,17 +1983,13 @@ private FastInputStream getStream() throws IOException { NamedList response; InputStream is = null; - // TODO use shardhandler - try (SolrClient client = - new Builder(leaderBaseUrl) - .withHttpClient(myHttpClient) - .withResponseParser(null) - .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS) - .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS) - .build()) { + try { QueryRequest req = new QueryRequest(params); - response = client.request(req, leaderCoreName); + req.setResponseParser(new InputStreamResponseParser(FILE_STREAM)); + req.setBasePath(leaderBaseUrl); + if (useExternalCompression) req.addHeader("Accept-Encoding", "gzip"); + response = solrClient.request(req, leaderCoreName); is = (InputStream) response.get("stream"); if (useInternalCompression) { is = new InflaterInputStream(is); @@ -2125,21 +2113,15 @@ NamedList getDetails() throws IOException, SolrServerException { params.set("follower", false); params.set(CommonParams.QT, ReplicationHandler.PATH); + QueryRequest request = new QueryRequest(params); + request.setBasePath(leaderBaseUrl); // TODO use shardhandler - try (SolrClient client = - new HttpSolrClient.Builder(leaderBaseUrl) - .withHttpClient(myHttpClient) - .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS) - .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS) - .build()) { - QueryRequest request = new QueryRequest(params); - return client.request(request, leaderCoreName); - } + return solrClient.request(request, leaderCoreName); } public void destroy() { abortFetch(); - HttpClientUtil.close(myHttpClient); + IOUtils.closeQuietly(solrClient); } String getLeaderCoreUrl() { diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java index b3ab8cb9156..650a61dc41d 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java @@ -75,17 +75,15 @@ public class UpdateShardHandler implements SolrInfoBean { private final Http2SolrClient updateOnlyClient; - private final CloseableHttpClient recoveryOnlyClient; + private final Http2SolrClient recoveryOnlyClient; private final CloseableHttpClient defaultClient; - private final InstrumentedPoolingHttpClientConnectionManager recoveryOnlyConnectionManager; - private final InstrumentedPoolingHttpClientConnectionManager defaultConnectionManager; private final InstrumentedHttpRequestExecutor httpRequestExecutor; - private final InstrumentedHttpListenerFactory updateHttpListenerFactory; + private final InstrumentedHttpListenerFactory trackHttpSolrMetrics; private SolrMetricsContext solrMetricsContext; @@ -93,16 +91,11 @@ public class UpdateShardHandler implements SolrInfoBean { private int connectionTimeout = HttpClientUtil.DEFAULT_CONNECT_TIMEOUT; public UpdateShardHandler(UpdateShardHandlerConfig cfg) { - recoveryOnlyConnectionManager = - new InstrumentedPoolingHttpClientConnectionManager( - HttpClientUtil.getSocketFactoryRegistryProvider().getSocketFactoryRegistry()); defaultConnectionManager = new InstrumentedPoolingHttpClientConnectionManager( HttpClientUtil.getSocketFactoryRegistryProvider().getSocketFactoryRegistry()); ModifiableSolrParams clientParams = new ModifiableSolrParams(); if (cfg != null) { - recoveryOnlyConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections()); - recoveryOnlyConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost()); defaultConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections()); defaultConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost()); clientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, cfg.getDistributedSocketTimeout()); @@ -120,10 +113,8 @@ public UpdateShardHandler(UpdateShardHandlerConfig cfg) { log.debug("Created default UpdateShardHandler HTTP client with params: {}", clientParams); httpRequestExecutor = new InstrumentedHttpRequestExecutor(getMetricNameStrategy(cfg)); - updateHttpListenerFactory = new InstrumentedHttpListenerFactory(getNameStrategy(cfg)); - recoveryOnlyClient = - HttpClientUtil.createClient( - clientParams, recoveryOnlyConnectionManager, false, httpRequestExecutor); + trackHttpSolrMetrics = new InstrumentedHttpListenerFactory(getNameStrategy(cfg)); + defaultClient = HttpClientUtil.createClient( clientParams, defaultConnectionManager, false, httpRequestExecutor); @@ -133,15 +124,24 @@ public UpdateShardHandler(UpdateShardHandlerConfig cfg) { DistributedUpdateProcessor.DISTRIB_FROM, DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); Http2SolrClient.Builder updateOnlyClientBuilder = new Http2SolrClient.Builder(); + Http2SolrClient.Builder recoveryOnlyClientBuilder = new Http2SolrClient.Builder(); if (cfg != null) { updateOnlyClientBuilder .withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS) .withIdleTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS) .withMaxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost()); + recoveryOnlyClientBuilder + .withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS) + .withIdleTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS) + .withMaxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost()); } + updateOnlyClientBuilder.withTheseParamNamesInTheUrl(urlParamNames); updateOnlyClient = updateOnlyClientBuilder.build(); - updateOnlyClient.addListenerFactory(updateHttpListenerFactory); + updateOnlyClient.addListenerFactory(trackHttpSolrMetrics); + + recoveryOnlyClient = recoveryOnlyClientBuilder.build(); + recoveryOnlyClient.addListenerFactory(trackHttpSolrMetrics); ThreadFactory recoveryThreadFactory = new SolrNamedThreadFactory("recoveryExecutor"); if (cfg != null && cfg.getMaxRecoveryThreads() > 0) { @@ -205,7 +205,7 @@ public String getName() { public void initializeMetrics(SolrMetricsContext parentContext, String scope) { solrMetricsContext = parentContext.getChildContext(this); String expandedScope = SolrMetricManager.mkName(scope, getCategory().name()); - updateHttpListenerFactory.initializeMetrics(solrMetricsContext, expandedScope); + trackHttpSolrMetrics.initializeMetrics(solrMetricsContext, expandedScope); defaultConnectionManager.initializeMetrics(solrMetricsContext, expandedScope); updateExecutor = MetricUtils.instrumentedExecutorService( @@ -247,7 +247,7 @@ public Http2SolrClient getUpdateOnlyHttpClient() { } // don't introduce a bug, this client is for recovery ops only! - public HttpClient getRecoveryOnlyHttpClient() { + public Http2SolrClient getRecoveryOnlyHttpClient() { return recoveryOnlyClient; } @@ -264,10 +264,6 @@ public PoolingHttpClientConnectionManager getDefaultConnectionManager() { return defaultConnectionManager; } - public PoolingHttpClientConnectionManager getRecoveryOnlyConnectionManager() { - return recoveryOnlyConnectionManager; - } - /** * @return executor for recovery operations */ @@ -290,10 +286,9 @@ public void close() { // do nothing } IOUtils.closeQuietly(updateOnlyClient); - HttpClientUtil.close(recoveryOnlyClient); + IOUtils.closeQuietly(recoveryOnlyClient); HttpClientUtil.close(defaultClient); defaultConnectionManager.close(); - recoveryOnlyConnectionManager.close(); } } @@ -309,5 +304,6 @@ public int getConnectionTimeout() { public void setSecurityBuilder(HttpClientBuilderPlugin builder) { builder.setup(updateOnlyClient); + builder.setup(recoveryOnlyClient); } } diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-follower-auth.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-follower-auth.xml new file mode 100644 index 00000000000..1635cfb099b --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-follower-auth.xml @@ -0,0 +1,61 @@ + + + + + + ${tests.luceneMatchVersion:LATEST} + + + ${solr.data.dir:} + + + + + + + + true + + + + + + + + + + + + + + http://127.0.0.1:TEST_PORT/solr/collection1 + 00:00:01 + COMPRESSION + solr + SolrRocks + + + + + + + max-age=30, public + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryStrategyStressTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryStrategyStressTest.java new file mode 100644 index 00000000000..06c03971a96 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryStrategyStressTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import com.carrotsearch.randomizedtesting.annotations.Nightly; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudLegacySolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.embedded.JettySolrRunner; +import org.junit.BeforeClass; +import org.junit.Test; + +@Nightly +public class RecoveryStrategyStressTest extends SolrCloudTestCase { + + @BeforeClass + public static void setupCluster() throws Exception { + cluster = configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure(); + } + + @Test + public void stressTestRecovery() throws Exception { + final String collection = "recoveryStressTest"; + CollectionAdminRequest.createCollection(collection, "conf", 1, 4) + .process(cluster.getSolrClient()); + waitForState( + "Expected a collection with one shard and two replicas", collection, clusterShape(1, 4)); + final var scheduledExecutorService = + Executors.newScheduledThreadPool(1, new SolrNamedThreadFactory("stressTestRecovery")); + try (SolrClient solrClient = + cluster.basicSolrClientBuilder().withDefaultCollection(collection).build()) { + final StoppableIndexingThread indexThread = + new StoppableIndexingThread(null, solrClient, "1", true, 10, 1, true); + + final var startAndStopCount = new CountDownLatch(50); + final Thread startAndStopRandomReplicas = + new Thread( + () -> { + try { + while (startAndStopCount.getCount() > 0) { + DocCollection state = getCollectionState(collection); + Replica leader = state.getLeader("shard1"); + Replica replica = + getRandomReplica(state.getSlice("shard1"), (r) -> !leader.equals(r)); + + JettySolrRunner jetty = cluster.getReplicaJetty(replica); + jetty.stop(); + Thread.sleep(100); + jetty.start(); + startAndStopCount.countDown(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + startAndStopRandomReplicas.start(); + // index and commit doc after fixed interval of 10 sec + scheduledExecutorService.scheduleWithFixedDelay( + indexThread, 1000, 10000, TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleWithFixedDelay( + () -> { + try { + new UpdateRequest().commit(solrClient, collection); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (SolrServerException e) { + throw new RuntimeException(e); + } + }, + 100, + 10000, + TimeUnit.MILLISECONDS); + + startAndStopCount.await(); + scheduledExecutorService.shutdownNow(); + // final commit to make documents visible for replicas + new UpdateRequest().commit(solrClient, collection); + } + cluster.getZkStateReader().waitForState(collection, 120, TimeUnit.SECONDS, clusterShape(1, 4)); + + // test that leader and replica have same doc count + DocCollection state = getCollectionState(collection); + assertShardConsistency(state.getSlice("shard1"), true); + } + + private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception { + List replicas = shard.getReplicas(r -> r.getState() == Replica.State.ACTIVE); + long[] numCounts = new long[replicas.size()]; + int i = 0; + for (Replica replica : replicas) { + try (var client = + new HttpSolrClient.Builder(replica.getBaseUrl()) + .withDefaultCollection(replica.getCoreName()) + .withHttpClient(((CloudLegacySolrClient) cluster.getSolrClient()).getHttpClient()) + .build()) { + numCounts[i] = + client.query(new SolrQuery("*:*").add("distrib", "false")).getResults().getNumFound(); + i++; + } + } + for (int j = 1; j < replicas.size(); j++) { + if (numCounts[j] != numCounts[j - 1]) + fail("Mismatch in counts between replicas"); // TODO improve this! + if (numCounts[j] == 0 && expectDocs) + fail("Expected docs on shard " + shard.getName() + " but found none"); + } + } +} diff --git a/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java b/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java index 892dc679d3f..a9c81429467 100644 --- a/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java +++ b/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java @@ -103,7 +103,8 @@ private static void copyFile(File src, File dst, Integer port, boolean internalC if (null != port) { line = line.replace("TEST_PORT", port.toString()); } - line = line.replace("COMPRESSION", internalCompression ? "internal" : "false"); + String externalCompression = LuceneTestCase.random().nextBoolean() ? "external" : "false"; + line = line.replace("COMPRESSION", internalCompression ? "internal" : externalCompression); out.write(line); } } diff --git a/solr/core/src/test/org/apache/solr/handler/TestUserManagedReplicationWithAuth.java b/solr/core/src/test/org/apache/solr/handler/TestUserManagedReplicationWithAuth.java new file mode 100644 index 00000000000..b230aad2021 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/TestUserManagedReplicationWithAuth.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler; + +import static org.apache.solr.common.params.CommonParams.JAVABIN; +import static org.apache.solr.handler.ReplicationHandler.CMD_DISABLE_POLL; +import static org.apache.solr.handler.ReplicationHandler.CMD_FETCH_INDEX; +import static org.apache.solr.handler.ReplicationHandler.COMMAND; +import static org.apache.solr.handler.ReplicationTestHelper.createAndStartJetty; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.HealthCheckRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.embedded.JettySolrRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +@SuppressSSL +public class TestUserManagedReplicationWithAuth extends SolrTestCaseJ4 { + JettySolrRunner leaderJetty, followerJetty, followerJettyWithAuth; + SolrClient leaderClient, followerClient, followerClientWithAuth; + ReplicationTestHelper.SolrInstance leader = null, follower = null, followerWithAuth = null; + + private static String user = "solr"; + private static String pass = "SolrRocks"; + private static String securityJson = + "{\n" + + "\"authentication\":{ \n" + + " \"blockUnknown\": true, \n" + + " \"class\":\"solr.BasicAuthPlugin\",\n" + + " \"credentials\":{\"solr\":\"IV0EHq1OnNrj6gvRCwvFwTrZ1+z1oBbnQdiVC3otuq0= Ndd7LKvVBAaZIF0QAVi1ekCfAJXr1GGfLtRUXhgrF8c=\"}, \n" + + " \"realm\":\"My Solr users\", \n" + + " \"forwardCredentials\": false \n" + + "},\n" + + "\"authorization\":{\n" + + " \"class\":\"solr.RuleBasedAuthorizationPlugin\",\n" + + " \"permissions\":[{\"name\":\"security-edit\",\n" + + " \"role\":\"admin\"}],\n" + + " \"user-role\":{\"solr\":\"admin\"}\n" + + "}}"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + systemSetPropertySolrDisableUrlAllowList("true"); + // leader with Basic auth enabled via security.json + leader = + new ReplicationTestHelper.SolrInstance( + createTempDir("solr-instance").toFile(), "leader", null); + leader.setUp(); + // Configuring basic auth for Leader + Path solrLeaderHome = Path.of(leader.getHomeDir()); + Files.write( + solrLeaderHome.resolve("security.json"), securityJson.getBytes(StandardCharsets.UTF_8)); + leaderJetty = ReplicationTestHelper.createAndStartJetty(leader); + leaderClient = + ReplicationTestHelper.createNewSolrClient( + buildUrl(leaderJetty.getLocalPort()), DEFAULT_TEST_CORENAME); + + // follower with no basic auth credentials for leader configured. + follower = + new ReplicationTestHelper.SolrInstance( + createTempDir("solr-instance").toFile(), "follower", leaderJetty.getLocalPort()); + follower.setUp(); + followerJetty = createAndStartJetty(follower); + followerClient = + ReplicationTestHelper.createNewSolrClient( + buildUrl(followerJetty.getLocalPort()), DEFAULT_TEST_CORENAME); + + // follower with basic auth credentials for leader configured in solrconfig.xml. + followerWithAuth = + new ReplicationTestHelper.SolrInstance( + createTempDir("solr-instance").toFile(), "follower-auth", leaderJetty.getLocalPort()); + followerWithAuth.setUp(); + followerJettyWithAuth = createAndStartJetty(followerWithAuth); + followerClientWithAuth = + ReplicationTestHelper.createNewSolrClient( + buildUrl(followerJettyWithAuth.getLocalPort()), DEFAULT_TEST_CORENAME); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + if (null != leaderJetty) { + leaderJetty.stop(); + leaderJetty = null; + } + if (null != followerJetty) { + followerJetty.stop(); + followerJetty = null; + } + if (null != followerJettyWithAuth) { + followerJettyWithAuth.stop(); + followerJettyWithAuth = null; + } + if (null != leaderClient) { + leaderClient.close(); + leaderClient = null; + } + if (null != followerClient) { + followerClient.close(); + followerClient = null; + } + if (null != followerClientWithAuth) { + followerClientWithAuth.close(); + followerClientWithAuth = null; + } + } + + private > T withBasicAuth(T req) { + req.setBasicAuthCredentials(user, pass); + return req; + } + + @Test + public void doTestManualFetchIndexWithAuthEnabled() throws Exception { + disablePoll(followerJetty, followerClient); + int nDocs = 500; + int docsAdded = 0; + + UpdateRequest commitReq = new UpdateRequest(); + withBasicAuth(commitReq); + for (int i = 0; docsAdded < nDocs / 2; i++, docsAdded++) { + SolrInputDocument doc = new SolrInputDocument(); + String[] fields = {"id", i + "", "name", "name = " + i}; + for (int j = 0; j < fields.length; j += 2) { + doc.addField(fields[j], fields[j + 1]); + } + UpdateRequest req = new UpdateRequest(); + withBasicAuth(req).add(doc); + req.process(leaderClient, DEFAULT_TEST_CORENAME); + if (i % 10 == 0) { + commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME); + } + } + commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME); + + assertEquals( + docsAdded, + queryWithBasicAuth(leaderClient, new SolrQuery("*:*")).getResults().getNumFound()); + + // Without Auth credentials fetchIndex will fail + pullIndexFromTo(leaderJetty, followerJetty, false); + assertNotEquals( + docsAdded, + queryWithBasicAuth(followerClient, new SolrQuery("*:*")).getResults().getNumFound()); + + // With Auth credentials + pullIndexFromTo(leaderJetty, followerJetty, true); + assertEquals( + docsAdded, + queryWithBasicAuth(followerClient, new SolrQuery("*:*")).getResults().getNumFound()); + } + + @Test + public void doTestAutoReplicationWithAuthEnabled() throws Exception { + int nDocs = 250; + UpdateRequest commitReq = new UpdateRequest(); + withBasicAuth(commitReq); + for (int i = 0; i < nDocs; i++) { + SolrInputDocument doc = new SolrInputDocument(); + String[] fields = {"id", i + "", "name", "name = " + i}; + for (int j = 0; j < fields.length; j += 2) { + doc.addField(fields[j], fields[j + 1]); + } + UpdateRequest req = new UpdateRequest(); + withBasicAuth(req).add(doc); + req.process(leaderClient, DEFAULT_TEST_CORENAME); + if (i % 10 == 0) { + commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME); + } + } + commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME); + // wait for followers to fetchIndex + Thread.sleep(5000); + // follower with auth should be healthy + HealthCheckRequest healthCheckRequestFollower = new HealthCheckRequest(); + healthCheckRequestFollower.setMaxGenerationLag(2); + assertEquals( + CommonParams.OK, + healthCheckRequestFollower + .process(followerClientWithAuth) + .getResponse() + .get(CommonParams.STATUS)); + // follower with auth should be unhealthy + healthCheckRequestFollower = new HealthCheckRequest(); + healthCheckRequestFollower.setMaxGenerationLag(2); + assertEquals( + CommonParams.FAILURE, + healthCheckRequestFollower.process(followerClient).getResponse().get(CommonParams.STATUS)); + } + + private QueryResponse queryWithBasicAuth(SolrClient client, SolrQuery q) + throws IOException, SolrServerException { + return withBasicAuth(new QueryRequest(q)).process(client); + } + + private void disablePoll(JettySolrRunner Jetty, SolrClient solrClient) + throws SolrServerException, IOException { + ModifiableSolrParams disablePollParams = new ModifiableSolrParams(); + disablePollParams.set(COMMAND, CMD_DISABLE_POLL); + disablePollParams.set(CommonParams.WT, JAVABIN); + disablePollParams.set(CommonParams.QT, ReplicationHandler.PATH); + QueryRequest req = new QueryRequest(disablePollParams); + withBasicAuth(req); + req.setBasePath(buildUrl(Jetty.getLocalPort())); + + solrClient.request(req, DEFAULT_TEST_CORENAME); + } + + private void pullIndexFromTo( + JettySolrRunner srcSolr, JettySolrRunner destSolr, boolean authEnabled) + throws SolrServerException, IOException { + String srcUrl = buildUrl(srcSolr.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME; + String destUrl = buildUrl(destSolr.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME; + QueryRequest req = getQueryRequestForFetchIndex(authEnabled, srcUrl); + req.setBasePath(buildUrl(destSolr.getLocalPort())); + followerClient.request(req, DEFAULT_TEST_CORENAME); + } + + private QueryRequest getQueryRequestForFetchIndex(boolean authEnabled, String srcUrl) { + ModifiableSolrParams solrParams = new ModifiableSolrParams(); + solrParams.set(COMMAND, CMD_FETCH_INDEX); + solrParams.set(CommonParams.WT, JAVABIN); + solrParams.set(CommonParams.QT, ReplicationHandler.PATH); + solrParams.set("leaderUrl", srcUrl); + solrParams.set("wait", "true"); + if (authEnabled) { + solrParams.set("httpBasicAuthUser", user); + solrParams.set("httpBasicAuthPassword", pass); + } + QueryRequest req = new QueryRequest(solrParams); + return req; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java index 40fd27b7c8e..e437e2f0a18 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java @@ -135,7 +135,9 @@ protected Http2SolrClient(String serverBaseUrl, Builder builder) { this.httpClient = createHttpClient(builder); this.closeClient = true; } - + if (builder.listenerFactory != null) { + this.listenerFactory.addAll(builder.listenerFactory); + } updateDefaultMimeTypeForParser(); this.httpClient.setFollowRedirects(Boolean.TRUE.equals(builder.followRedirects)); @@ -147,6 +149,10 @@ public void addListenerFactory(HttpListenerFactory factory) { this.listenerFactory.add(factory); } + public List getListenerFactory() { + return listenerFactory; + } + // internal usage only HttpClient getHttpClient() { return httpClient; @@ -845,6 +851,13 @@ public static class Builder protected Long keyStoreReloadIntervalSecs; + public Http2SolrClient.Builder withListenerFactory(List listenerFactory) { + this.listenerFactory = listenerFactory; + return this; + } + + private List listenerFactory; + public Builder() { super(); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java index a8831fd4842..b6642955c4a 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java @@ -587,5 +587,27 @@ public void testBuilder() { } } + @Test + public void testIdleTimeoutWithHttpClient() { + try (Http2SolrClient oldClient = + new Http2SolrClient.Builder("baseSolrUrl") + .withIdleTimeout(5000, TimeUnit.MILLISECONDS) + .build()) { + try (Http2SolrClient onlyBaseUrlChangedClient = + new Http2SolrClient.Builder("newBaseSolrUrl").withHttpClient(oldClient).build()) { + assertEquals(oldClient.getIdleTimeout(), onlyBaseUrlChangedClient.getIdleTimeout()); + assertEquals(oldClient.getHttpClient(), onlyBaseUrlChangedClient.getHttpClient()); + } + try (Http2SolrClient idleTimeoutChangedClient = + new Http2SolrClient.Builder("baseSolrUrl") + .withHttpClient(oldClient) + .withIdleTimeout(3000, TimeUnit.MILLISECONDS) + .build()) { + assertFalse(oldClient.getIdleTimeout() == idleTimeoutChangedClient.getIdleTimeout()); + assertEquals(3000, idleTimeoutChangedClient.getIdleTimeout()); + } + } + } + /* Missed tests : - set cookies via interceptor - invariant params - compression */ }