Skip to content

Commit

Permalink
Rework schema initialization for better performance (#1078)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Sep 18, 2024
1 parent 6c04841 commit 38b0a07
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 144 deletions.
29 changes: 26 additions & 3 deletions astra/src/main/java/com/slack/astra/blobfs/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@

import static software.amazon.awssdk.services.s3.model.ListObjectsV2Request.builder;

import com.slack.astra.chunk.ReadWriteChunk;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

/**
Expand Down Expand Up @@ -80,9 +84,7 @@ public void upload(String prefix, Path directoryToUpload) {
*/
public void download(String prefix, Path destinationDirectory) {
assert prefix != null && !prefix.isEmpty();
if (destinationDirectory.toFile().exists()) {
assert destinationDirectory.toFile().isDirectory();
}
assert !destinationDirectory.toFile().exists() || destinationDirectory.toFile().isDirectory();

try {
transferManager
Expand All @@ -99,6 +101,27 @@ public void download(String prefix, Path destinationDirectory) {
}
}

public byte[] getSchema(String chunkId) {
try {
return transferManager
.download(
DownloadRequest.builder()
.getObjectRequest(
GetObjectRequest.builder()
.bucket(bucketName)
.key(String.format("%s/%s", chunkId, ReadWriteChunk.SCHEMA_FILE_NAME))
.build())
.responseTransformer(new ByteArrayAsyncResponseTransformer<>())
.build())
.completionFuture()
.get()
.result()
.asByteArray();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Lists all files found on object store by the complete object key (including prefix). This would
* included what is generally considered the directory (ie foo/bar.example)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.slack.astra.clusterManager.CacheNodeAssignmentService.snapshotMetadataBySnapshotId;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.astra.blobfs.BlobStore;
import com.slack.astra.chunk.Chunk;
import com.slack.astra.chunk.ReadOnlyChunkImpl;
Expand All @@ -25,6 +26,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,6 +61,10 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {
private CacheNodeAssignmentStore cacheNodeAssignmentStore;
private CacheNodeMetadataStore cacheNodeMetadataStore;

private ExecutorService executorService =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("caching-chunk-manager-%d").build());

public CachingChunkManager(
MeterRegistry registry,
AsyncCuratorFramework curatorFramework,
Expand Down Expand Up @@ -223,7 +230,7 @@ private void onAssignmentHandler(CacheNodeAssignment assignment) {
cacheNodeAssignmentStore,
assignment,
snapshotsBySnapshotId.get(assignment.snapshotId));
Thread.ofVirtual().start(newChunk::downloadChunkData);
executorService.submit(newChunk::downloadChunkData);
chunkMap.put(assignment.assignmentId, newChunk);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.slack.astra.logstore.opensearch.AstraBigArrays;
import com.slack.astra.logstore.opensearch.AstraIndexSettings;
import com.slack.astra.logstore.opensearch.OpenSearchAdapter;
import com.slack.astra.logstore.search.SearchResultUtils;
import com.slack.astra.logstore.search.aggregations.AutoDateHistogramAggBuilder;
Expand Down Expand Up @@ -265,7 +266,7 @@ private static DateRangeQueryVisitor getDateRange(String queryBody) {
QueryShardContext queryShardContext =
new QueryShardContext(
0,
openSearchAdapter.getIndexSettings(),
AstraIndexSettings.getInstance(),
AstraBigArrays.getInstance(),
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
public class AstraBigArrays {
private static BigArrays bigArray = null;

private AstraBigArrays() {}

public static BigArrays getInstance() {
if (bigArray == null) {
PageCacheRecycler pageCacheRecycler =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.slack.astra.logstore.opensearch;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.opensearch.index.analysis.AnalyzerScope;
import org.opensearch.index.analysis.IndexAnalyzers;
import org.opensearch.index.analysis.NamedAnalyzer;

public class AstraIndexAnalyzer {

private static IndexAnalyzers indexAnalyzers;

private AstraIndexAnalyzer() {}

public static IndexAnalyzers getInstance() {
if (indexAnalyzers == null) {
indexAnalyzers =
new IndexAnalyzers(
singletonMap(
"default",
new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer())),
emptyMap(),
emptyMap());
}
return indexAnalyzers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.slack.astra.logstore.opensearch;

import static org.opensearch.common.settings.IndexScopedSettings.BUILT_IN_INDEX_SETTINGS;

import com.slack.astra.logstore.LogMessage;
import java.util.HashSet;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;

public class AstraIndexSettings {
private static IndexSettings indexSettings = null;

// we can make this configurable when SchemaAwareLogDocumentBuilderImpl enforces a limit
// set this to a high number for now
private static final int TOTAL_FIELDS_LIMIT =
Integer.parseInt(System.getProperty("astra.mapping.totalFieldsLimit", "2500"));

private AstraIndexSettings() {}

public static IndexSettings getInstance() {
if (indexSettings == null) {
indexSettings = buildIndexSettings();
}
return indexSettings;
}

/** Builds the minimal amount of IndexSettings required for using Aggregations */
private static IndexSettings buildIndexSettings() {
Settings settings =
Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_11_0)
.put(
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), TOTAL_FIELDS_LIMIT)

// Astra time sorts the indexes while building it
// {LuceneIndexStoreImpl#buildIndexWriterConfig}
// When we were using the lucene query parser the sort info was leveraged by lucene
// automatically ( as the sort info persists in the segment info ) at query time.
// However the OpenSearch query parser has a custom implementation which relies on the
// index sort info to be present as a setting here.
.put("index.sort.field", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName)
.put("index.sort.order", "desc")
.put("index.query.default_field", LogMessage.SystemField.ALL.fieldName)
.put("index.query_string.lenient", false)
.build();

Settings nodeSetings =
Settings.builder().put("indices.query.query_string.analyze_wildcard", true).build();

IndexScopedSettings indexScopedSettings =
new IndexScopedSettings(settings, new HashSet<>(BUILT_IN_INDEX_SETTINGS));

return new IndexSettings(
IndexMetadata.builder("index").settings(settings).build(),
nodeSetings,
indexScopedSettings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.slack.astra.logstore.opensearch;

import static java.util.Collections.emptyList;

import java.util.Collections;
import java.util.Map;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.mapper.MetadataFieldMapper;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.plugins.MapperPlugin;

public class AstraMapperRegistry {
private static Map<String, Mapper.TypeParser> mappers;
private static Map<String, MetadataFieldMapper.TypeParser> metadataMappers;

private AstraMapperRegistry() {}

public static MapperRegistry buildNewInstance() {
if (mappers == null || metadataMappers == null) {
mappers = Collections.unmodifiableMap(IndicesModule.getMappers(emptyList()));
metadataMappers = Collections.unmodifiableMap(IndicesModule.getMetadataMappers(emptyList()));
}
return new MapperRegistry(mappers, metadataMappers, MapperPlugin.NOOP_FIELD_FILTER);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.slack.astra.logstore.opensearch;

import static java.util.Collections.emptyMap;

import org.opensearch.index.similarity.SimilarityService;

public class AstraSimilarityService {
private static SimilarityService similarityService = null;

private AstraSimilarityService() {}

public static SimilarityService getInstance() {
if (similarityService == null) {
similarityService = new SimilarityService(AstraIndexSettings.getInstance(), null, emptyMap());
}
return similarityService;
}
}
Loading

0 comments on commit 38b0a07

Please sign in to comment.