Skip to content

Commit

Permalink
Merge pull request #11 from mkhludnev/ubi-distr
Browse files Browse the repository at this point in the history
UBI goes distrib
  • Loading branch information
epugh authored Dec 10, 2024
2 parents dc9a0ed + a44abb6 commit fce0334
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.solr.handler.component;

import static org.apache.solr.handler.RequestHandlerBase.isInternalShardRequest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
Expand All @@ -34,6 +36,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
Expand Down Expand Up @@ -244,33 +247,46 @@ public void process(ResponseBuilder rb) throws IOException {
if (!params.getBool(COMPONENT_NAME, false)) {
return;
}

doStuff(rb);
if (!isInternalShardRequest(rb.req)) { // subordinate shard req shouldn't yield logs
storeUbiDetails(
rb,
ubiQuery -> {
try {
DocList docList = ((ResultContext) rb.rsp.getResponse()).getDocList();
ubiQuery.setDocIds(extractDocIds(docList, rb.req.getSearcher()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}

@Override
public int distributedProcess(ResponseBuilder rb) throws IOException {

SolrParams params = rb.req.getParams();
if (!params.getBool(COMPONENT_NAME, false)) {
return ResponseBuilder.STAGE_DONE;
}

if (rb.stage != ResponseBuilder.STAGE_GET_FIELDS) {
return ResponseBuilder.STAGE_DONE;
if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) {
return ResponseBuilder.STAGE_GET_FIELDS;
}

doStuff(rb);
if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
storeUbiDetails(
rb,
ubiQuery ->
ubiQuery.setDocIds(
String.join(",", rb.resultIds.keySet().stream().map(Object::toString).toList())));
return ResponseBuilder.STAGE_DONE;
}

return ResponseBuilder.STAGE_DONE;
}

public void doStuff(ResponseBuilder rb) throws IOException {

// not sure why but sometimes we get it twoice... how can a response have the
// the same component run twice?
private static UBIQuery getUbiQuery(ResponseBuilder rb) {
if (rb.rsp.getValues().get("ubi") != null) {
return;
return null;
}
SolrParams params = rb.req.getParams();

Expand Down Expand Up @@ -305,13 +321,14 @@ public void doStuff(ResponseBuilder rb) throws IOException {
}
}
}
return ubiQuery;
}

ResultContext rc = (ResultContext) rb.rsp.getResponse();
DocList docs = rc.getDocList();

String docIds = extractDocIds(docs, searcher);
ubiQuery.setDocIds(docIds);

private void storeUbiDetails(ResponseBuilder rb, Consumer<UBIQuery> docIdsSetter)
throws IOException {
UBIQuery ubiQuery = getUbiQuery(rb);
if (ubiQuery == null) return;
docIdsSetter.accept(ubiQuery);
addUserBehaviorInsightsToResponse(ubiQuery, rb);
recordQuery(ubiQuery);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;

import java.util.List;
import java.util.Map;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cluster.api.SimpleMap;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.MapSolrParams;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
public class UBIComponentDistrQueriesTest extends SolrCloudTestCase {

private static final String COLLECTIONORALIAS = "collection1";
private static final int TIMEOUT = DEFAULT_TIMEOUT;
private static final String id = "id";

private static boolean useAlias;

@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("ubi-enabled").resolve("conf"))
.configure();

String collection;
useAlias = false; // random().nextBoolean();
if (useAlias) {
collection = COLLECTIONORALIAS + "_collection";
} else {
collection = COLLECTIONORALIAS;
}

CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
.process(cluster.getSolrClient());

cluster.waitForActiveCollection(collection, 2, 2);

AbstractDistribZkTestBase.waitForRecoveriesToFinish(
collection, cluster.getZkStateReader(), false, true, TIMEOUT);
if (useAlias) {
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection)
.process(cluster.getSolrClient());
}

// -------------------

CollectionAdminRequest.createCollection(
"ubi_queries", // it seems like a hardcoded name why?
"_default",
1,
1)
.process(cluster.getSolrClient());

cluster.waitForActiveCollection("ubi_queries", 1, 1);

AbstractDistribZkTestBase.waitForRecoveriesToFinish(
"ubi_queries", cluster.getZkStateReader(), false, true, TIMEOUT);
}

@Before
public void cleanIndex() throws Exception {
new UpdateRequest().deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}

@Test
public void testUBIQueryStream() throws Exception {
cluster
.getSolrClient(COLLECTIONORALIAS)
.add(
List.of(
new SolrInputDocument("id", "1", "subject", "aa"),
new SolrInputDocument("id", "2" /*"two"*/, "subject", "aa"),
new SolrInputDocument("id", "3", "subject", "aa")));
cluster.getSolrClient(COLLECTIONORALIAS).commit(true, true);
QueryResponse queryResponse =
cluster
.getSolrClient(COLLECTIONORALIAS)
.query(
new MapSolrParams(Map.of("q", "aa", "df", "subject", "rows", "2", "ubi", "true")));
String qid = (String) ((SimpleMap<?>) queryResponse.getResponse().get("ubi")).get("query_id");
assertTrue(qid.length() > 10);
Thread.sleep(10000); // I know what you think of
// TODO check that ids were recorded
QueryResponse queryCheck =
cluster
.getSolrClient("ubi_queries")
.query(
new MapSolrParams(
Map.of(
"q", "id:" + qid // doesn't search it why? is it a race?
)));
// however I can't see doc ids found there. Shouldn't I ?
assertEquals(1L, queryCheck.getResults().getNumFound());
assertEquals(queryCheck.getResults().get(0).get("id"), qid);
}
}

0 comments on commit fce0334

Please sign in to comment.