Skip to content

Commit

Permalink
David leifker/elasticsearch optimization ext (#6925)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Jan 2, 2023
1 parent 8faf542 commit 200d170
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.datahub.graphql.types.chart.ChartType;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.ESSampleDataFixture;
import com.linkedin.metadata.search.AggregationMetadata;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.search.SearchService;
Expand Down Expand Up @@ -403,6 +404,18 @@ public void testMinNumberLengthLimit() throws IOException {
String.format("Expected: %s Actual: %s", expected, actual));
}

@Test
public void testFacets() throws IOException {
Set<String> expectedFacets = Set.of("entity", "typeNames", "platform", "origin", "tags");
SearchResult testResult = search(searchService, "cypress");
expectedFacets.forEach(facet -> {
assertTrue(testResult.getMetadata().getAggregations().stream().anyMatch(agg -> agg.getName().equals(facet)),
String.format("Failed to find facet `%s` in %s", facet,
testResult.getMetadata().getAggregations().stream()
.map(AggregationMetadata::getName).collect(Collectors.toList())));
});
}

private Stream<AnalyzeResponse.AnalyzeToken> getTokens(AnalyzeRequest request) throws IOException {
return _searchClient.indices().analyze(request, RequestOptions.DEFAULT).getTokens().stream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@
import com.linkedin.metadata.version.GitVersion;
import com.linkedin.mxe.BuildIndicesHistoryEvent;
import com.linkedin.mxe.Topics;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import joptsimple.internal.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.stereotype.Component;
Expand All @@ -29,8 +35,11 @@
@Slf4j
@EnableKafka
public class BuildIndicesKafkaListener implements ConsumerSeekAware, BootstrapDependency {
@Autowired
private KafkaListenerEndpointRegistry registry;

private static final String CONSUMER_GROUP = "${BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-bihe-consumer-job-client}";
private static final String SUFFIX = "temp";
private static final String TOPIC_NAME = "${BUILD_INDICES_HISTORY_TOPIC_NAME:" + Topics.BUILD_INDICES_HISTORY_TOPIC_NAME + "}";

private final DefaultKafkaConsumerFactory<String, GenericRecord> _defaultKafkaConsumerFactory;
Expand All @@ -50,7 +59,7 @@ public class BuildIndicesKafkaListener implements ConsumerSeekAware, BootstrapDe
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
try (Consumer<String, GenericRecord> kafkaConsumer =
_defaultKafkaConsumerFactory.createConsumer(consumerGroup, "temp")) {
_defaultKafkaConsumerFactory.createConsumer(consumerGroup, SUFFIX)) {
final Map<TopicPartition, Long> offsetMap = kafkaConsumer.endOffsets(assignments.keySet());
assignments.entrySet().stream()
.filter(entry -> topicName.equals(entry.getKey().topic()))
Expand Down Expand Up @@ -90,6 +99,12 @@ public void waitForUpdate() {
for (int i = 0; i < maxBackOffs; i++) {
if (isUpdated.get()) {
log.debug("Finished waiting for updated indices.");
String consumerId = Strings.join(List.of(CONSUMER_GROUP, SUFFIX), "");
try {
registry.getListenerContainer(consumerId).stop();
} catch (NullPointerException e) {
log.error("Expected consumer `{}` to shutdown.", consumerId);
}
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ record GlobalTags {
"fieldName": "tags",
"fieldType": "URN",
"boostScore": 0.5,
"queryByDefault": true
"queryByDefault": true,
"addToFilters": true
}
}
tags: array[TagAssociation]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe("search", () => {
cy.visit(
"/search?filter_entity=DATASET&filter_tags=urn%3Ali%3Atag%3ACypress&page=1&query=users_created"
);
cy.contains("of 1 result");
cy.contains("of 2 result");

cy.contains("Cypress");

Expand Down

0 comments on commit 200d170

Please sign in to comment.