diff --git a/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java b/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java index 6bbc052a70..393030e5c0 100644 --- a/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java +++ b/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java @@ -2,21 +2,25 @@ import static software.amazon.awssdk.services.s3.model.ListObjectsV2Request.builder; +import com.slack.astra.chunk.ReadWriteChunk; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload; import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest; +import software.amazon.awssdk.transfer.s3.model.DownloadRequest; import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; /** @@ -80,9 +84,7 @@ public void upload(String prefix, Path directoryToUpload) { */ public void download(String prefix, Path destinationDirectory) { assert prefix != null && !prefix.isEmpty(); - if (destinationDirectory.toFile().exists()) { - assert destinationDirectory.toFile().isDirectory(); - } + assert !destinationDirectory.toFile().exists() || destinationDirectory.toFile().isDirectory(); try { transferManager @@ -99,6 +101,27 @@ public void download(String prefix, Path destinationDirectory) { } } + public byte[] getSchema(String chunkId) { + try { + return transferManager + .download( + DownloadRequest.builder() + .getObjectRequest( + GetObjectRequest.builder() + .bucket(bucketName) + .key(String.format("%s/%s", chunkId, ReadWriteChunk.SCHEMA_FILE_NAME)) + .build()) + .responseTransformer(new ByteArrayAsyncResponseTransformer<>()) + .build()) + .completionFuture() + .get() + .result() + .asByteArray(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + /** * Lists all files found on object store by the complete object key (including prefix). This would * included what is generally considered the directory (ie foo/bar.example) diff --git a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java index b7e75cf048..4d0630f757 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java @@ -2,6 +2,7 @@ import static com.slack.astra.clusterManager.CacheNodeAssignmentService.snapshotMetadataBySnapshotId; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.astra.blobfs.BlobStore; import com.slack.astra.chunk.Chunk; import com.slack.astra.chunk.ReadOnlyChunkImpl; @@ -25,6 +26,8 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.curator.x.async.AsyncCuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +61,10 @@ public class CachingChunkManager extends ChunkManagerBase { private CacheNodeAssignmentStore cacheNodeAssignmentStore; private CacheNodeMetadataStore cacheNodeMetadataStore; + private ExecutorService executorService = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("caching-chunk-manager-%d").build()); + public CachingChunkManager( MeterRegistry registry, AsyncCuratorFramework curatorFramework, @@ -223,7 +230,7 @@ private void onAssignmentHandler(CacheNodeAssignment assignment) { cacheNodeAssignmentStore, assignment, snapshotsBySnapshotId.get(assignment.snapshotId)); - Thread.ofVirtual().start(newChunk::downloadChunkData); + executorService.submit(newChunk::downloadChunkData); chunkMap.put(assignment.assignmentId, newChunk); } } diff --git a/astra/src/main/java/com/slack/astra/elasticsearchApi/OpenSearchRequest.java b/astra/src/main/java/com/slack/astra/elasticsearchApi/OpenSearchRequest.java index b07c0a7282..b08b4e2552 100644 --- a/astra/src/main/java/com/slack/astra/elasticsearchApi/OpenSearchRequest.java +++ b/astra/src/main/java/com/slack/astra/elasticsearchApi/OpenSearchRequest.java @@ -10,6 +10,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.slack.astra.logstore.opensearch.AstraBigArrays; +import com.slack.astra.logstore.opensearch.AstraIndexSettings; import com.slack.astra.logstore.opensearch.OpenSearchAdapter; import com.slack.astra.logstore.search.SearchResultUtils; import com.slack.astra.logstore.search.aggregations.AutoDateHistogramAggBuilder; @@ -265,7 +266,7 @@ private static DateRangeQueryVisitor getDateRange(String queryBody) { QueryShardContext queryShardContext = new QueryShardContext( 0, - openSearchAdapter.getIndexSettings(), + AstraIndexSettings.getInstance(), AstraBigArrays.getInstance(), null, null, diff --git a/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraBigArrays.java b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraBigArrays.java index 288b1a4bca..22e90209c8 100644 --- a/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraBigArrays.java +++ b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraBigArrays.java @@ -14,6 +14,8 @@ public class AstraBigArrays { private static BigArrays bigArray = null; + private AstraBigArrays() {} + public static BigArrays getInstance() { if (bigArray == null) { PageCacheRecycler pageCacheRecycler = diff --git a/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraIndexAnalyzer.java b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraIndexAnalyzer.java new file mode 100644 index 0000000000..3d4d338cb1 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraIndexAnalyzer.java @@ -0,0 +1,29 @@ +package com.slack.astra.logstore.opensearch; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.opensearch.index.analysis.AnalyzerScope; +import org.opensearch.index.analysis.IndexAnalyzers; +import org.opensearch.index.analysis.NamedAnalyzer; + +public class AstraIndexAnalyzer { + + private static IndexAnalyzers indexAnalyzers; + + private AstraIndexAnalyzer() {} + + public static IndexAnalyzers getInstance() { + if (indexAnalyzers == null) { + indexAnalyzers = + new IndexAnalyzers( + singletonMap( + "default", + new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer())), + emptyMap(), + emptyMap()); + } + return indexAnalyzers; + } +} diff --git a/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraIndexSettings.java b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraIndexSettings.java new file mode 100644 index 0000000000..d463bd77f1 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraIndexSettings.java @@ -0,0 +1,64 @@ +package com.slack.astra.logstore.opensearch; + +import static org.opensearch.common.settings.IndexScopedSettings.BUILT_IN_INDEX_SETTINGS; + +import com.slack.astra.logstore.LogMessage; +import java.util.HashSet; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; + +public class AstraIndexSettings { + private static IndexSettings indexSettings = null; + + // we can make this configurable when SchemaAwareLogDocumentBuilderImpl enforces a limit + // set this to a high number for now + private static final int TOTAL_FIELDS_LIMIT = + Integer.parseInt(System.getProperty("astra.mapping.totalFieldsLimit", "2500")); + + private AstraIndexSettings() {} + + public static IndexSettings getInstance() { + if (indexSettings == null) { + indexSettings = buildIndexSettings(); + } + return indexSettings; + } + + /** Builds the minimal amount of IndexSettings required for using Aggregations */ + private static IndexSettings buildIndexSettings() { + Settings settings = + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_11_0) + .put( + MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), TOTAL_FIELDS_LIMIT) + + // Astra time sorts the indexes while building it + // {LuceneIndexStoreImpl#buildIndexWriterConfig} + // When we were using the lucene query parser the sort info was leveraged by lucene + // automatically ( as the sort info persists in the segment info ) at query time. + // However the OpenSearch query parser has a custom implementation which relies on the + // index sort info to be present as a setting here. + .put("index.sort.field", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName) + .put("index.sort.order", "desc") + .put("index.query.default_field", LogMessage.SystemField.ALL.fieldName) + .put("index.query_string.lenient", false) + .build(); + + Settings nodeSetings = + Settings.builder().put("indices.query.query_string.analyze_wildcard", true).build(); + + IndexScopedSettings indexScopedSettings = + new IndexScopedSettings(settings, new HashSet<>(BUILT_IN_INDEX_SETTINGS)); + + return new IndexSettings( + IndexMetadata.builder("index").settings(settings).build(), + nodeSetings, + indexScopedSettings); + } +} diff --git a/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraMapperRegistry.java b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraMapperRegistry.java new file mode 100644 index 0000000000..6bc3e7c3bc --- /dev/null +++ b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraMapperRegistry.java @@ -0,0 +1,26 @@ +package com.slack.astra.logstore.opensearch; + +import static java.util.Collections.emptyList; + +import java.util.Collections; +import java.util.Map; +import org.opensearch.index.mapper.Mapper; +import org.opensearch.index.mapper.MetadataFieldMapper; +import org.opensearch.indices.IndicesModule; +import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.plugins.MapperPlugin; + +public class AstraMapperRegistry { + private static Map mappers; + private static Map metadataMappers; + + private AstraMapperRegistry() {} + + public static MapperRegistry buildNewInstance() { + if (mappers == null || metadataMappers == null) { + mappers = Collections.unmodifiableMap(IndicesModule.getMappers(emptyList())); + metadataMappers = Collections.unmodifiableMap(IndicesModule.getMetadataMappers(emptyList())); + } + return new MapperRegistry(mappers, metadataMappers, MapperPlugin.NOOP_FIELD_FILTER); + } +} diff --git a/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraSimilarityService.java b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraSimilarityService.java new file mode 100644 index 0000000000..beae7df9fe --- /dev/null +++ b/astra/src/main/java/com/slack/astra/logstore/opensearch/AstraSimilarityService.java @@ -0,0 +1,18 @@ +package com.slack.astra.logstore.opensearch; + +import static java.util.Collections.emptyMap; + +import org.opensearch.index.similarity.SimilarityService; + +public class AstraSimilarityService { + private static SimilarityService similarityService = null; + + private AstraSimilarityService() {} + + public static SimilarityService getInstance() { + if (similarityService == null) { + similarityService = new SimilarityService(AstraIndexSettings.getInstance(), null, emptyMap()); + } + return similarityService; + } +} diff --git a/astra/src/main/java/com/slack/astra/logstore/opensearch/OpenSearchAdapter.java b/astra/src/main/java/com/slack/astra/logstore/opensearch/OpenSearchAdapter.java index 9faafee697..7b65ca51ad 100644 --- a/astra/src/main/java/com/slack/astra/logstore/opensearch/OpenSearchAdapter.java +++ b/astra/src/main/java/com/slack/astra/logstore/opensearch/OpenSearchAdapter.java @@ -1,11 +1,8 @@ package com.slack.astra.logstore.opensearch; -import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; -import static org.opensearch.common.settings.IndexScopedSettings.BUILT_IN_INDEX_SETTINGS; - -import com.slack.astra.logstore.LogMessage; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.slack.astra.logstore.search.aggregations.AggBuilder; import com.slack.astra.logstore.search.aggregations.AggBuilderBase; import com.slack.astra.logstore.search.aggregations.AutoDateHistogramAggBuilder; @@ -32,23 +29,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.commons.lang3.ObjectUtils; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; -import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.compress.CompressedXContent; -import org.opensearch.common.settings.IndexScopedSettings; -import org.opensearch.common.settings.Settings; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.common.bytes.BytesReference; @@ -56,9 +47,6 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexSettings; -import org.opensearch.index.analysis.AnalyzerScope; -import org.opensearch.index.analysis.IndexAnalyzers; -import org.opensearch.index.analysis.NamedAnalyzer; import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.mapper.MappedFieldType; @@ -68,7 +56,6 @@ import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.QueryStringQueryBuilder; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.indices.IndicesModule; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.script.Script; import org.opensearch.search.aggregations.AbstractAggregationBuilder; @@ -120,22 +107,15 @@ public class OpenSearchAdapter { private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAdapter.class); - private final IndexSettings indexSettings; - private final SimilarityService similarityService; + private static final IndexSettings indexSettings = AstraIndexSettings.getInstance(); + private static final SimilarityService similarityService = AstraSimilarityService.getInstance(); private final MapperService mapperService; private final Map chunkSchema; - // we can make this configurable when SchemaAwareLogDocumentBuilderImpl enforces a limit - // set this to a high number for now - private static final int TOTAL_FIELDS_LIMIT = - Integer.parseInt(System.getProperty("astra.mapping.totalFieldsLimit", "2500")); - public OpenSearchAdapter(Map chunkSchema) { - this.indexSettings = buildIndexSettings(); - this.similarityService = new SimilarityService(indexSettings, null, emptyMap()); - this.mapperService = buildMapperService(indexSettings, similarityService); + this.mapperService = buildMapperService(); this.chunkSchema = chunkSchema; } @@ -152,12 +132,7 @@ public OpenSearchAdapter(Map chunkSchema) { public Query buildQuery(IndexSearcher indexSearcher, QueryBuilder queryBuilder) throws IOException { QueryShardContext queryShardContext = - buildQueryShardContext( - AstraBigArrays.getInstance(), - indexSettings, - indexSearcher, - similarityService, - mapperService); + buildQueryShardContext(AstraBigArrays.getInstance(), indexSearcher, mapperService); if (queryBuilder != null) { try { @@ -173,50 +148,19 @@ public Query buildQuery(IndexSearcher indexSearcher, QueryBuilder queryBuilder) /** * For each defined field in the chunk schema, this will check if the field is already registered, - * and if not attempt to register it with the mapper service + * and if not attempt to register it with the mapper service. + * + * @see this.loadSchema() */ public void reloadSchema() { // TreeMap here ensures the schema is sorted by natural order - to ensure multifields are // registered by their parent first, and then fields added second for (Map.Entry entry : new TreeMap<>(chunkSchema).entrySet()) { + String fieldMapping = getFieldMapping(entry.getValue().fieldType); try { - if (entry.getValue().fieldType == FieldType.TEXT) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "text")); - } else if (entry.getValue().fieldType == FieldType.STRING - || entry.getValue().fieldType == FieldType.KEYWORD - || entry.getValue().fieldType == FieldType.ID) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "keyword")); - } else if (entry.getValue().fieldType == FieldType.IP) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "ip")); - } else if (entry.getValue().fieldType == FieldType.DATE) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "date")); - } else if (entry.getValue().fieldType == FieldType.BOOLEAN) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "boolean")); - } else if (entry.getValue().fieldType == FieldType.DOUBLE) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "double")); - } else if (entry.getValue().fieldType == FieldType.FLOAT) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "float")); - } else if (entry.getValue().fieldType == FieldType.HALF_FLOAT) { - tryRegisterField( - mapperService, entry.getValue().name, b -> b.field("type", "half_float")); - } else if (entry.getValue().fieldType == FieldType.INTEGER) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "integer")); - } else if (entry.getValue().fieldType == FieldType.LONG) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "long")); - } else if (entry.getValue().fieldType == FieldType.SCALED_LONG) { + if (fieldMapping != null) { tryRegisterField( - mapperService, entry.getValue().name, b -> b.field("type", "scaled_long")); - } else if (entry.getValue().fieldType == FieldType.SHORT) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "short")); - } else if (entry.getValue().fieldType == FieldType.BYTE) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "byte")); - } else if (entry.getValue().fieldType == FieldType.BINARY) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "binary")); - } else { - LOG.warn( - "Field type '{}' is not yet currently supported for field '{}'", - entry.getValue().fieldType, - entry.getValue().name); + mapperService, entry.getValue().name, b -> b.field("type", fieldMapping)); } } catch (Exception e) { LOG.error("Error parsing schema mapping for {}", entry.getValue().toString(), e); @@ -288,7 +232,7 @@ public InternalAggregation reduce(Collection collectors) throws IOEx internalAggregationList.add(collector.buildTopLevel()); } - if (internalAggregationList.size() == 0) { + if (internalAggregationList.isEmpty()) { return null; } else { // Using the first element on the list as the basis for the reduce method is per @@ -296,7 +240,7 @@ public InternalAggregation reduce(Collection collectors) throws IOEx // reusing an existing instance (typically the first in the given list) to save // on redundant object construction." return internalAggregationList - .get(0) + .getFirst() .reduce( internalAggregationList, InternalAggregation.ReduceContext.forPartialReduction( @@ -335,58 +279,18 @@ private static ValuesSourceRegistry buildValueSourceRegistry() { return valuesSourceRegistryBuilder.build(); } - /** Builds the minimal amount of IndexSettings required for using Aggregations */ - protected static IndexSettings buildIndexSettings() { - Settings settings = - Settings.builder() - .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_2_11_0) - .put( - MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), TOTAL_FIELDS_LIMIT) - - // Astra time sorts the indexes while building it - // {LuceneIndexStoreImpl#buildIndexWriterConfig} - // When we were using the lucene query parser the sort info was leveraged by lucene - // automatically ( as the sort info persists in the segment info ) at query time. - // However, the OpenSearch query parser has a custom implementation which relies on the - // index sort info to be present as a setting here. - .put("index.sort.field", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName) - .put("index.sort.order", "desc") - .put("index.query.default_field", LogMessage.SystemField.ALL.fieldName) - .put("index.query_string.lenient", false) - .build(); - - Settings nodeSetings = - Settings.builder().put("indices.query.query_string.analyze_wildcard", true).build(); - - IndexScopedSettings indexScopedSettings = - new IndexScopedSettings(settings, new HashSet<>(BUILT_IN_INDEX_SETTINGS)); - - return new IndexSettings( - IndexMetadata.builder("index").settings(settings).build(), - nodeSetings, - indexScopedSettings); - } - /** * Builds a MapperService using the minimal amount of settings required for Aggregations. After * initializing the mapper service, individual fields will still need to be added using * this.registerField() */ - private static MapperService buildMapperService( - IndexSettings indexSettings, SimilarityService similarityService) { + private static MapperService buildMapperService() { return new MapperService( - indexSettings, - new IndexAnalyzers( - singletonMap( - "default", - new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer())), - emptyMap(), - emptyMap()), + OpenSearchAdapter.indexSettings, + AstraIndexAnalyzer.getInstance(), new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), - similarityService, - new IndicesModule(emptyList()).getMapperRegistry(), + OpenSearchAdapter.similarityService, + AstraMapperRegistry.buildNewInstance(), () -> { throw new UnsupportedOperationException(); }, @@ -399,26 +303,23 @@ private static MapperService buildMapperService( * AggregatorFactory to successfully instantiate. See AggregatorFactory.class */ private static QueryShardContext buildQueryShardContext( - BigArrays bigArrays, - IndexSettings indexSettings, - IndexSearcher indexSearcher, - SimilarityService similarityService, - MapperService mapperService) { + BigArrays bigArrays, IndexSearcher indexSearcher, MapperService mapperService) { final ValuesSourceRegistry valuesSourceRegistry = buildValueSourceRegistry(); return new QueryShardContext( 0, - indexSettings, + OpenSearchAdapter.indexSettings, bigArrays, null, new IndexFieldDataService( - indexSettings, + OpenSearchAdapter.indexSettings, new IndicesFieldDataCache( - indexSettings.getSettings(), new IndexFieldDataCache.Listener() {}), + OpenSearchAdapter.indexSettings.getSettings(), + new IndexFieldDataCache.Listener() {}), new NoneCircuitBreakerService(), mapperService) ::getForField, mapperService, - similarityService, + OpenSearchAdapter.similarityService, ScriptServiceProvider.getInstance(), null, null, @@ -431,6 +332,165 @@ private static QueryShardContext buildQueryShardContext( valuesSourceRegistry); } + /** + * Performs an initial load of the schema into the mapper service. This differs from the + * reloadSchema in that this first builds an entire mapping and then performs a single merge into + * the mapper service, instead of individually attempting to merge each field as it is + * encountered. + */ + public void loadSchema() { + ObjectNode rootNode = new ObjectNode(JsonNodeFactory.instance); + + for (Map.Entry entry : new TreeMap<>(chunkSchema).entrySet()) { + String fieldName = entry.getValue().name; + if (mapperService.isMetadataField(fieldName)) { + LOG.trace("Skipping metadata field '{}'", fieldName); + } else { + ObjectNode child = new ObjectNode(JsonNodeFactory.instance); + child.put("type", getFieldMapping(entry.getValue().fieldType)); + putAtPath(rootNode, child, entry.getKey()); + } + } + + try { + XContentBuilder builder = + XContentFactory.jsonBuilder().startObject().startObject("_doc").startObject("properties"); + rootNode.fields().forEachRemaining((entry) -> buildObject(builder, entry)); + builder.endObject().endObject().endObject(); + + mapperService.merge( + MapperService.SINGLE_MAPPING_NAME, + new CompressedXContent(BytesReference.bytes(builder)), + MapperService.MergeReason.MAPPING_UPDATE); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Given a root object node and a new object node, will insert the new node into the provided + * dot-separated path. If a deeply nested path is provided, this will initialize the entire tree + * as needed. + * + * @param root ObjectNode root node + * @param newNode ObjectNode to insert + * @param path Dot separated path (foo.bar) + */ + private void putAtPath(ObjectNode root, ObjectNode newNode, String path) { + String[] fieldParts = path.split("\\."); + + ObjectNode currentRef = root; + for (int i = 0; i < fieldParts.length; i++) { + if (!currentRef.has(fieldParts[i])) { + // it doesn't have the current object + if (i == fieldParts.length - 1) { + // last thing, add ours + currentRef.set(fieldParts[i], newNode); + return; + } else { + // add a parent holder + ObjectNode fields = new ObjectNode(JsonNodeFactory.instance); + fields.set("fields", new ObjectNode(JsonNodeFactory.instance)); + currentRef.set(fieldParts[i], fields); + currentRef = (ObjectNode) currentRef.get(fieldParts[i]).get("fields"); + } + + } else { + // it has the parent + if (i == fieldParts.length - 1) { + currentRef.setAll(newNode); + return; + } else { + // it has the parent, does it have a fields? + if (!currentRef.get(fieldParts[i]).has("fields")) { + ((ObjectNode) currentRef.get(fieldParts[i])) + .set("fields", new ObjectNode(JsonNodeFactory.instance)); + } + currentRef = (ObjectNode) currentRef.get(fieldParts[i]).get("fields"); + } + } + } + } + + /** + * Builds the resulting XContent from the provided map entries for use in the mapper service. For + * entries that do not have explicit field definitions (ie, foo.bar exists as a field while foo + * does not), will default to a binary type. + */ + private void buildObject(XContentBuilder builder, Map.Entry entry) { + try { + builder.startObject(entry.getKey()); + + // does it have a type field set? + if (entry.getValue().has("type")) { + builder.field("type", entry.getValue().get("type").asText()); + } else { + // todo - "default" when no field exists + builder.field("type", "binary"); + } + + // does it have fields set? + if (entry.getValue().has("fields")) { + builder.startObject("fields"); + entry + .getValue() + .get("fields") + .fields() + .forEachRemaining( + (fieldEntry) -> { + // do this same step all-over + buildObject(builder, fieldEntry); + }); + builder.endObject(); + } + + builder.endObject(); + } catch (Exception e) { + LOG.error("Error building object", e); + } + } + + /** + * Returns the corresponding OpenSearch field type as a string, given the Astra FieldType + * definition. todo - this probably should be moved into the respective FieldType enums directly + */ + private String getFieldMapping(FieldType fieldType) { + if (fieldType == FieldType.TEXT) { + return "text"; + } else if (fieldType == FieldType.STRING + || fieldType == FieldType.KEYWORD + || fieldType == FieldType.ID) { + return "keyword"; + } else if (fieldType == FieldType.IP) { + return "ip"; + } else if (fieldType == FieldType.DATE) { + return "date"; + } else if (fieldType == FieldType.BOOLEAN) { + return "boolean"; + } else if (fieldType == FieldType.DOUBLE) { + return "double"; + } else if (fieldType == FieldType.FLOAT) { + return "float"; + } else if (fieldType == FieldType.HALF_FLOAT) { + return "half_float"; + } else if (fieldType == FieldType.INTEGER) { + return "integer"; + } else if (fieldType == FieldType.LONG) { + return "long"; + } else if (fieldType == FieldType.SCALED_LONG) { + return "scaled_long"; + } else if (fieldType == FieldType.SHORT) { + return "short"; + } else if (fieldType == FieldType.BYTE) { + return "byte"; + } else if (fieldType == FieldType.BINARY) { + return "binary"; + } else { + LOG.warn("Field type '{}' is not yet currently supported", fieldType); + return null; + } + } + /** * Registers a field type and name to the MapperService for use in aggregations. This informs the * aggregators how to access a specific field and what value type it contains. registerField( @@ -495,12 +555,7 @@ private static boolean tryRegisterField( public Aggregator buildAggregatorUsingContext( AggBuilder builder, IndexSearcher indexSearcher, Query query) throws IOException { QueryShardContext queryShardContext = - buildQueryShardContext( - AstraBigArrays.getInstance(), - indexSettings, - indexSearcher, - similarityService, - mapperService); + buildQueryShardContext(AstraBigArrays.getInstance(), indexSearcher, mapperService); SearchContext searchContext = new AstraSearchContext( AstraBigArrays.getInstance(), queryShardContext, indexSearcher, query); @@ -1060,8 +1115,4 @@ protected static HistogramAggregationBuilder getHistogramAggregationBuilder( public MapperService getMapperService() { return this.mapperService; } - - public IndexSettings getIndexSettings() { - return this.indexSettings; - } } diff --git a/astra/src/main/java/com/slack/astra/logstore/opensearch/ScriptServiceProvider.java b/astra/src/main/java/com/slack/astra/logstore/opensearch/ScriptServiceProvider.java index 6d87620213..155b8d395c 100644 --- a/astra/src/main/java/com/slack/astra/logstore/opensearch/ScriptServiceProvider.java +++ b/astra/src/main/java/com/slack/astra/logstore/opensearch/ScriptServiceProvider.java @@ -27,7 +27,7 @@ public static ScriptService getInstance() { } private static ScriptService createInstance() { - IndexSettings indexSettings = OpenSearchAdapter.buildIndexSettings(); + IndexSettings indexSettings = AstraIndexSettings.getInstance(); PluginsService pluginsService = new PluginsService( indexSettings.getSettings(), Path.of(""), null, null, List.of(PainlessPlugin.class)); diff --git a/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java b/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java index 90147f147a..9a35b392ee 100644 --- a/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java +++ b/astra/src/main/java/com/slack/astra/logstore/search/LogIndexSearcherImpl.java @@ -79,7 +79,7 @@ public void afterRefresh(boolean didRefresh) { this.searcherManager.addListener(refreshListener); // initialize the adapter with whatever the default schema is - openSearchAdapter.reloadSchema(); + openSearchAdapter.loadSchema(); } @Override diff --git a/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchema.java b/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchema.java index 5652097a78..49ba01227f 100644 --- a/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchema.java +++ b/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchema.java @@ -23,6 +23,10 @@ public static void serializeToFile(ChunkSchema chunkSchema, File file) throws IO Files.writeString(file.toPath(), serDe.toJsonStr(chunkSchema)); } + public static ChunkSchema deserializeBytes(byte[] bytes) throws IOException { + return serDe.fromJsonStr(new String(bytes)); + } + public static ChunkSchema deserializeFile(Path path) throws IOException { return serDe.fromJsonStr(Files.readString(path)); } diff --git a/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchemaSerializer.java b/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchemaSerializer.java index 25f2b0eefc..3edfd175d2 100644 --- a/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchemaSerializer.java +++ b/astra/src/main/java/com/slack/astra/metadata/schema/ChunkSchemaSerializer.java @@ -9,6 +9,9 @@ import java.util.concurrent.ConcurrentHashMap; public class ChunkSchemaSerializer implements MetadataSerializer { + + private static final JsonFormat.Parser parser = JsonFormat.parser().ignoringUnknownFields(); + private static Metadata.ChunkSchema toChunkSchemaProto(ChunkSchema chunkSchema) { final Map fieldDefProtoMap = new HashMap<>(chunkSchema.fieldDefMap.size()); @@ -49,7 +52,7 @@ public String toJsonStr(ChunkSchema chunkSchema) throws InvalidProtocolBufferExc @Override public ChunkSchema fromJsonStr(String chunkSchemaStr) throws InvalidProtocolBufferException { Metadata.ChunkSchema.Builder chunkSchemaBuilder = Metadata.ChunkSchema.newBuilder(); - JsonFormat.parser().ignoringUnknownFields().merge(chunkSchemaStr, chunkSchemaBuilder); + parser.merge(chunkSchemaStr, chunkSchemaBuilder); return fromChunkSchemaProto(chunkSchemaBuilder.build()); } } diff --git a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java index 362f3eada8..6284e9aaa6 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java @@ -201,7 +201,6 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception filesToUpload.add(schemaFile.getName()); IndexCommit indexCommit = logStore.getIndexCommit(); filesToUpload.addAll(indexCommit.getFileNames()); - System.out.println(filesToUpload.size()); logStore.close(); assertThat(dirPath.toFile().listFiles().length).isGreaterThanOrEqualTo(filesToUpload.size());