Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

David leifker/elasticsearch optimization ext #6932

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a4d4ff6
quickstart updates
david-leifker Dec 29, 2022
095e04c
Fix registry for datahub-upgrade
david-leifker Dec 29, 2022
ba7836b
Merge branch 'feat/elasticsearch-optimization-ext' into david-leifker…
david-leifker Dec 29, 2022
c1e7d72
add missing version in datahub-upgrade
david-leifker Dec 29, 2022
369b0bf
Misc updates
david-leifker Dec 30, 2022
9b030c0
quickstart-update
david-leifker Dec 30, 2022
e27c09c
refactor(reindexing): refactor reindexing logic
david-leifker Dec 31, 2022
e25c2b0
Merge remote-tracking branch 'datahub-project/feat/elasticsearch-opti…
david-leifker Dec 31, 2022
5418a72
Merge remote-tracking branch 'datahub-project/feat/elasticsearch-opti…
david-leifker Dec 31, 2022
ea79de5
lint
david-leifker Dec 31, 2022
fc9ee63
fix test
david-leifker Dec 31, 2022
4b30347
Merge remote-tracking branch 'datahub-project/feat/elasticsearch-opti…
david-leifker Dec 31, 2022
4328ded
Merge remote-tracking branch 'datahub-project/feat/elasticsearch-opti…
david-leifker Dec 31, 2022
943af8f
fix tests
david-leifker Dec 31, 2022
7044d2c
Merge remote-tracking branch 'datahub-project/feat/elasticsearch-opti…
david-leifker Dec 31, 2022
e4725d5
Support aliases during clone
david-leifker Dec 31, 2022
626cad7
fulltext to structured flag
david-leifker Jan 2, 2023
35dc594
fix non-fulltext path with stemming
david-leifker Jan 2, 2023
0f4957b
Shutdown kafka consumer on success
david-leifker Jan 2, 2023
0b04393
Merge branch 'feat/elasticsearch-optimization-ext' into david-leifker…
david-leifker Jan 2, 2023
718f121
add env var to m1
david-leifker Jan 3, 2023
1530231
feat(elasticsearch): allow document mismatches
david-leifker Jan 3, 2023
247ea9c
Merge branch 'feat/elasticsearch-optimization-ext' into david-leifker…
david-leifker Jan 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ services:
depends_on:
- mysql
environment:
- BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
- EBEAN_DATASOURCE_USERNAME=datahub
- EBEAN_DATASOURCE_PASSWORD=datahub
- EBEAN_DATASOURCE_HOST=mysql:3306
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class SearchableFieldSpecExtractor implements SchemaVisitor {
Map.of(
"enableAutocomplete", "false",
"fieldType", "URN",
"boostScore", "0.4"
"boostScore", "0.1"
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ public class BuildIndicesConfiguration {
private String backOffFactor;
private boolean waitForBuildIndices;
private boolean cloneIndices;
private boolean allowDocCountMismatch;

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

import com.linkedin.metadata.config.ElasticSearchConfiguration;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class ESIndexBuilder {
@Getter
private final boolean enableIndexMappingsReindex;

private final ElasticSearchConfiguration elasticSearchConfiguration;


public ReindexConfig buildReindexState(String indexName, Map<String, Object> mappings, Map<String, Object> settings) throws IOException {
ReindexConfig.ReindexConfigBuilder builder = ReindexConfig.builder()
Expand Down Expand Up @@ -205,12 +208,19 @@ private void reindex(String indexName, ReindexConfig indexState) throws IOExcept
}

if (originalCount != reindexedCount) {
log.info("Post-reindex document count is different, source_doc_count: {} reindex_doc_count: {}", originalCount,
reindexedCount);
diff(indexName, tempIndexName, Math.max(originalCount, reindexedCount));
searchClient.indices().delete(new DeleteIndexRequest().indices(tempIndexName), RequestOptions.DEFAULT);
throw new RuntimeException(String.format("Reindex from %s to %s failed. Document count %s != %s", indexName, tempIndexName,
originalCount, reindexedCount));
if (elasticSearchConfiguration.getBuildIndices().isAllowDocCountMismatch()
&& elasticSearchConfiguration.getBuildIndices().isCloneIndices()) {
log.warn("Index: {} - Post-reindex document count is different, source_doc_count: {} reindex_doc_count: {}\n"
+ "This condition is explicitly ALLOWED, please refer to latest clone if original index is required.",
indexName, originalCount, reindexedCount);
} else {
log.error("Index: {} - Post-reindex document count is different, source_doc_count: {} reindex_doc_count: {}",
indexName, originalCount, reindexedCount);
diff(indexName, tempIndexName, Math.max(originalCount, reindexedCount));
searchClient.indices().delete(new DeleteIndexRequest().indices(tempIndexName), RequestOptions.DEFAULT);
throw new RuntimeException(String.format("Reindex from %s to %s failed. Document count %s != %s", indexName, tempIndexName,
originalCount, reindexedCount));
}
}

log.info("Reindex from {} to {} succeeded", indexName, tempIndexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
Expand Down Expand Up @@ -67,7 +68,8 @@ protected EntityIndexBuilders entityIndexBuilders(
@Qualifier("sampleDataIndexConvention") IndexConvention indexConvention
) {
ESIndexBuilder indexBuilder = new ESIndexBuilder(_searchClient, 1, 0, 1,
1, Map.of(), true, false);
1, Map.of(), true, false,
new ElasticSearchConfiguration());
SettingsBuilder settingsBuilder = new SettingsBuilder(null);
return new EntityIndexBuilders(indexBuilder, entityRegistry, indexConvention, settingsBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO;
import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO;
Expand Down Expand Up @@ -72,7 +73,8 @@ protected EntityIndexBuilders entityIndexBuilders(
@Qualifier("searchLineageIndexConvention") IndexConvention indexConvention
) {
ESIndexBuilder indexBuilder = new ESIndexBuilder(_searchClient, 1, 0, 1,
1, Map.of(), true, false);
1, Map.of(), true, false,
new ElasticSearchConfiguration());
SettingsBuilder settingsBuilder = new SettingsBuilder(null);
return new EntityIndexBuilders(indexBuilder, entityRegistry, indexConvention, settingsBuilder);
}
Expand All @@ -93,7 +95,8 @@ protected ElasticSearchService entitySearchService(
@Nonnull
protected ESIndexBuilder esIndexBuilder() {
return new ESIndexBuilder(_searchClient, 1, 1, 1, 1, Map.of(),
true, true);
true, true,
new ElasticSearchConfiguration());
}

@Bean(name = "searchLineageGraphService")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata;

import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
Expand Down Expand Up @@ -90,7 +91,8 @@ public ESBulkProcessor getBulkProcessor(@Qualifier("elasticSearchRestHighLevelCl
@Nonnull
protected ESIndexBuilder getIndexBuilder(@Qualifier("elasticSearchRestHighLevelClient") RestHighLevelClient searchClient) {
return new ESIndexBuilder(searchClient, 1, 1, 3, 1, Map.of(),
false, false);
false, false,
new ElasticSearchConfiguration());
}

@Bean(name = "entityRegistry")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableMap;
import com.linkedin.metadata.ESTestConfiguration;
import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.systemmetadata.SystemMetadataMappingsBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
Expand Down Expand Up @@ -44,7 +45,8 @@ public class ESIndexBuilderTest extends AbstractTestNGSpringContextTests {
public void setup() {
_indexClient = _searchClient.indices();
testDefaultBuilder = new ESIndexBuilder(_searchClient, 1, 0, 0,
0, Map.of(), false, false);
0, Map.of(), false, false,
new ElasticSearchConfiguration());
}

@BeforeMethod
Expand Down Expand Up @@ -74,7 +76,8 @@ public static GetIndexResponse getTestIndex() throws IOException {
@Test
public void testESIndexBuilderCreation() throws Exception {
ESIndexBuilder customIndexBuilder = new ESIndexBuilder(_searchClient, 2, 0, 1,
0, Map.of(), false, false);
0, Map.of(), false, false,
new ElasticSearchConfiguration());
customIndexBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of());
GetIndexResponse resp = getTestIndex();

Expand All @@ -86,7 +89,8 @@ public void testESIndexBuilderCreation() throws Exception {
@Test
public void testMappingReindex() throws Exception {
ESIndexBuilder enabledMappingReindex = new ESIndexBuilder(_searchClient, 1, 0, 0,
0, Map.of(), false, true);
0, Map.of(), false, true,
new ElasticSearchConfiguration());

// No mappings
enabledMappingReindex.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of());
Expand Down Expand Up @@ -130,7 +134,8 @@ public void testSettingsNumberOfShardsReindex() throws Exception {
testDefaultBuilder.getNumRetries(),
testDefaultBuilder.getRefreshIntervalSeconds(),
Map.of(),
true, false);
true, false,
new ElasticSearchConfiguration());

// add new shard setting
changedShardBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of());
Expand All @@ -156,28 +161,32 @@ public void testSettingsNoReindex() throws Exception {
testDefaultBuilder.getNumRetries(),
testDefaultBuilder.getRefreshIntervalSeconds(),
Map.of(),
true, false),
true, false,
new ElasticSearchConfiguration()),
new ESIndexBuilder(_searchClient,
testDefaultBuilder.getNumShards(),
testDefaultBuilder.getNumReplicas(),
testDefaultBuilder.getNumRetries(),
testDefaultBuilder.getRefreshIntervalSeconds() + 10,
Map.of(),
true, false),
true, false,
new ElasticSearchConfiguration()),
new ESIndexBuilder(_searchClient,
testDefaultBuilder.getNumShards() + 1,
testDefaultBuilder.getNumReplicas(),
testDefaultBuilder.getNumRetries(),
testDefaultBuilder.getRefreshIntervalSeconds(),
Map.of(),
false, false),
false, false,
new ElasticSearchConfiguration()),
new ESIndexBuilder(_searchClient,
testDefaultBuilder.getNumShards(),
testDefaultBuilder.getNumReplicas() + 1,
testDefaultBuilder.getNumRetries(),
testDefaultBuilder.getRefreshIntervalSeconds(),
Map.of(),
false, false)
false, false,
new ElasticSearchConfiguration())
);

for (ESIndexBuilder builder : noReindexBuilders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import joptsimple.internal.Strings;
import lombok.RequiredArgsConstructor;
Expand All @@ -26,6 +27,7 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

// We don't disable this on GMS since we want GMS to also wait until the indices are ready to read in case of
Expand Down Expand Up @@ -99,11 +101,13 @@ public void waitForUpdate() {
for (int i = 0; i < maxBackOffs; i++) {
if (isUpdated.get()) {
log.debug("Finished waiting for updated indices.");
String consumerId = Strings.join(List.of(CONSUMER_GROUP, SUFFIX), "");
try {
registry.getListenerContainer(consumerId).stop();
log.info("Containers: {}", registry.getListenerContainers().stream()
.map(MessageListenerContainer::getListenerId)
.collect(Collectors.toList()));
registry.getListenerContainer(consumerGroup).stop();
} catch (NullPointerException e) {
log.error("Expected consumer `{}` to shutdown.", consumerId);
log.error("Expected consumer `{}` to shutdown.", consumerGroup);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.gms.factory.common.IndexConventionFactory;
import com.linkedin.gms.factory.common.RestHighLevelClientFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -28,7 +29,7 @@


@Configuration
@Import({RestHighLevelClientFactory.class, IndexConventionFactory.class})
@Import({RestHighLevelClientFactory.class, IndexConventionFactory.class, ElasticSearchConfiguration.class})
@PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class)
public class ElasticSearchIndexBuilderFactory {

Expand Down Expand Up @@ -76,9 +77,10 @@ protected Map<String, Map<String, String>> getIndexSettingsOverrides(
@Bean(name = "elasticSearchIndexBuilder")
@Nonnull
protected ESIndexBuilder getInstance(
@Qualifier("elasticSearchIndexSettingsOverrides") Map<String, Map<String, String>> overrides) {
@Qualifier("elasticSearchIndexSettingsOverrides") Map<String, Map<String, String>> overrides,
final ElasticSearchConfiguration elasticSearchConfiguration) {
return new ESIndexBuilder(searchClient, numShards, numReplicas, numRetries, refreshIntervalSeconds, overrides,
enableSettingsReindex, enableMappingsReindex);
enableSettingsReindex, enableMappingsReindex, elasticSearchConfiguration);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ elasticsearch:
settingsOverrides: ${ELASTICSEARCH_INDEX_BUILDER_SETTINGS_OVERRIDES:#{null}}
entitySettingsOverrides: ${ELASTICSEARCH_INDEX_BUILDER_ENTITY_SETTINGS_OVERRIDES:#{null}}
buildIndices:
allowDocCountMismatch: ${ELASTICSEARCH_BUILD_INDICES_ALLOW_DOC_COUNT_MISMATCH:false} # when cloneIndices is also enabled
cloneIndices: ${ELASTICSEARCH_BUILD_INDICES_CLONE_INDICES:true}
initialBackOffMs: ${ELASTICSEARCH_BUILD_INDICES_INITIAL_BACK_OFF_MILLIS:5000}
maxBackOffs: ${ELASTICSEARCH_BUILD_INDICES_MAX_BACK_OFFS:5}
Expand Down