diff --git a/pom.xml b/pom.xml index 03d8142..113ed50 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.mule.mulechain mulechain-vectors - 0.1.87-SNAPSHOT + 0.1.88-SNAPSHOT mule-extension MAC Vectors diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java index cc38c9d..c496809 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java @@ -41,6 +41,9 @@ private Constants() {} public static final String VECTOR_STORE_AI_SEARCH = "AI_SEARCH"; public static final String VECTOR_STORE_NEO4J = "NEO4J"; + public static final String STORE_SCHEMA_METADATA_FIELD_NAME = "metadata"; + public static final String STORE_SCHEMA_VECTOR_FIELD_NAME = "vector"; + public static final String METADATA_KEY_SOURCE_ID = "source_id"; public static final String METADATA_KEY_INDEX = "index"; public static final String METADATA_KEY_FILE_NAME = "file_name"; diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java index 3a8cc99..45eb958 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java @@ -183,13 +183,4 @@ private static EmbeddingStore createWeaviateStore(String protocol, .apiKey(apiKey) .build(); } - - /*private static EmbeddingStore createNeo4JStore(String boltURL, String userName, String password, String collectionName, Integer dimension) { - return Neo4jEmbeddingStore.builder() - .withBasicAuth(boltURL, userName, password) - .dimension(dimension) - .databaseName(collectionName) - .build(); - }*/ - } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java index 02610ea..e91587e 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java @@ -19,7 +19,7 @@ import dev.langchain4j.store.embedding.filter.Filter; import org.json.JSONArray; import org.json.JSONObject; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.runtime.extension.api.annotation.Alias; import org.mule.runtime.extension.api.annotation.param.*; @@ -354,9 +354,9 @@ public InputStream listSourcesFromStore(String storeName, ) { EmbeddingOperationValidator.validateOperationType( - Constants.EMBEDDING_OPERATION_TYPE_FILTER_BY_METADATA,configuration.getVectorStore()); + Constants.EMBEDDING_OPERATION_TYPE_QUERY_ALL,configuration.getVectorStore()); EmbeddingOperationValidator.validateOperationType( - Constants.EMBEDDING_OPERATION_TYPE_QUERY_ALL,configuration.getVectorStore()); + Constants.EMBEDDING_OPERATION_TYPE_FILTER_BY_METADATA,configuration.getVectorStore()); VectorStore vectorStore = VectorStore.builder() .storeName(storeName) diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java similarity index 61% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java index f40e30f..e8aeaae 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java @@ -1,4 +1,4 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store; +package org.mule.extension.mulechain.vectors.internal.store; import dev.langchain4j.data.document.Metadata; import dev.langchain4j.data.embedding.Embedding; @@ -16,9 +16,14 @@ import org.mule.extension.mulechain.vectors.internal.helper.factory.EmbeddingStoreFactory; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.aisearch.AISearchStore; -import org.mule.extension.mulechain.vectors.internal.helper.store.milvus.MilvusStore; -import org.mule.extension.mulechain.vectors.internal.helper.store.pgvector.PGVectorStore; +import org.mule.extension.mulechain.vectors.internal.store.aisearch.AISearchStore; +import org.mule.extension.mulechain.vectors.internal.store.chroma.ChromaStore; +import org.mule.extension.mulechain.vectors.internal.store.elasticsearch.ElasticsearchStore; +import org.mule.extension.mulechain.vectors.internal.store.milvus.MilvusStore; +import org.mule.extension.mulechain.vectors.internal.store.opensearch.OpenSearchStore; +import org.mule.extension.mulechain.vectors.internal.store.pgvector.PGVectorStore; +import org.mule.extension.mulechain.vectors.internal.store.pinecone.PineconeStore; +import org.mule.extension.mulechain.vectors.internal.store.weviate.WeaviateStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,16 +33,34 @@ import static dev.langchain4j.store.embedding.filter.MetadataFilterBuilder.metadataKey; +/** + * The {@code VectorStore} class provides a framework for interacting with various types of vector stores, + * enabling storage and retrieval of vector embeddings for data analysis and retrieval purposes. It serves as + * an abstract base for specific implementations such as Milvus, PGVector, and AI Search stores. + */ public class VectorStore { protected static final Logger LOGGER = LoggerFactory.getLogger(VectorStore.class); + protected static final String JSON_KEY_SOURCES = "sources"; + protected static final String JSON_KEY_SEGMENT_COUNT = "segmentCount"; + protected static final String JSON_KEY_SOURCE_COUNT = "sourceCount"; + protected static final String JSON_KEY_STORE_NAME = "storeName"; + protected String storeName; protected Configuration configuration; protected QueryParameters queryParams; protected EmbeddingModelNameParameters modelParams; protected EmbeddingModel embeddingModel; + /** + * Constructs a new {@code VectorStore} instance with specified configurations. + * + * @param storeName the name of the vector store + * @param configuration the configuration object containing settings for the vector store + * @param queryParams parameters for querying the vector store + * @param modelParams parameters for selecting and configuring the embedding model + */ public VectorStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { this.storeName = storeName; @@ -54,17 +77,27 @@ public EmbeddingModel embeddingModel() { return this.embeddingModel; } + /** + * Retrieves the embedding model used by this vector store. Initializes the model if it is not already set. + * + * @return the embedding model used by the vector store + */ public Embedding getZeroVectorEmbedding() { // Create a general query vector (e.g., zero vector). Zero vector is often used when you need to retrieve all // embeddings without any specific bias. - float[] queryVector = new float[embeddingModel.dimension()]; - for (int i = 0; i < embeddingModel.dimension(); i++) { + float[] queryVector = new float[embeddingModel().dimension()]; + for (int i = 0; i < embeddingModel().dimension(); i++) { queryVector[i]=0.0f; // Zero vector } return new Embedding(queryVector); } + /** + * Retrieves a JSON object listing sources available in the vector store, including metadata for each source. + * + * @return a JSON object containing a list of sources and their metadata + */ public JSONObject listSources() { dev.langchain4j.store.embedding.EmbeddingStore @@ -73,11 +106,11 @@ public JSONObject listSources() { Embedding queryEmbedding = getZeroVectorEmbedding(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); JSONArray sources = new JSONArray(); List> embeddingMatches = null; - HashMap sourcesJSONObjectHashMap = new HashMap(); + HashMap sourceObjectMap = new HashMap(); String lowerBoundaryIngestionDateTime = "0000-00-00T00:00:00.000Z"; int lowerBoundaryIndex = -1; @@ -87,8 +120,7 @@ public JSONObject listSources() { LOGGER.debug("Embedding page filter: lowerBoundaryIngestionDateTime: " + lowerBoundaryIngestionDateTime + ", lowerBoundaryIndex: " + lowerBoundaryIndex); - Filter - condition1 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThanOrEqualTo(lowerBoundaryIngestionDateTime); + Filter condition1 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThanOrEqualTo(lowerBoundaryIngestionDateTime); Filter condition2 = metadataKey(Constants.METADATA_KEY_INDEX).isGreaterThan(String.valueOf(lowerBoundaryIndex)); // Index must be handled as a String Filter condition3 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThan(lowerBoundaryIngestionDateTime); @@ -129,7 +161,7 @@ public JSONObject listSources() { } JSONObject sourceObject = new JSONObject(); - sourceObject.put("segmentCount", Integer.parseInt(index) + 1); + sourceObject.put(JSON_KEY_SEGMENT_COUNT, Integer.parseInt(index) + 1); sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId); sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath); sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath); @@ -137,26 +169,8 @@ public JSONObject listSources() { sourceObject.put(Constants.METADATA_KEY_URL, url); sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime); - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - - int currentSegmentCount = Integer.parseInt(index) + 1; - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - if(currentSegmentCount > storedSegmentCount) { + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - - } else { - - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } currentPageEmbeddingId = match.embeddingId(); } @@ -168,8 +182,8 @@ public JSONObject listSources() { } while(embeddingMatches.size() == queryParams.embeddingPageSize()); - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } @@ -206,6 +220,44 @@ protected String getSourceUniqueKey(JSONObject sourceObject) { return !sourceId.isEmpty() ? sourceId : alternativeKey; } + /** + * Adds or updates a source object into the source object map. + * + * @param sourceObjectMap The map of source objects keyed by their unique keys. + * @param sourceObject The source object to add or update. + */ + protected void addOrUpdateSourceObjectIntoSourceObjectMap(HashMap sourceObjectMap, JSONObject sourceObject) { + + String sourceUniqueKey = getSourceUniqueKey(sourceObject); + + // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key + if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { + // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) + if(sourceObjectMap.containsKey(sourceUniqueKey)){ + // Get current index + int currentSegmentCount = sourceObject.getInt(JSON_KEY_SEGMENT_COUNT); + // Get previously stored index + int storedSegmentCount = (int) sourceObjectMap.get(sourceUniqueKey).get(JSON_KEY_SEGMENT_COUNT); + // Check if object need to be updated + if(currentSegmentCount > storedSegmentCount) { + sourceObjectMap.put(sourceUniqueKey, sourceObject); + } + } else { + sourceObjectMap.put(sourceUniqueKey, sourceObject); + } + } + } + + /** + * Extracts and organizes metadata fields from a given JSON object to create a structured source object. + *

+ * The generated source object includes keys such as source ID, file name, URL, full path, and ingestion + * datetime, among others. + *

+ * + * @param metadataObject a {@code JSONObject} containing metadata fields. + * @return a {@code JSONObject} with organized metadata for a source. + */ protected JSONObject getSourceObject(JSONObject metadataObject) { String sourceId = metadataObject.has(Constants.METADATA_KEY_SOURCE_ID) ? metadataObject.getString(Constants.METADATA_KEY_SOURCE_ID) : null; @@ -218,7 +270,7 @@ protected JSONObject getSourceObject(JSONObject metadataObject) { String ingestionDatetime = metadataObject.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? metadataObject.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : null; JSONObject sourceObject = new JSONObject(); - sourceObject.put("segmentCount", Integer.parseInt(index) + 1); + sourceObject.put(JSON_KEY_SEGMENT_COUNT, Integer.parseInt(index) + 1); sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId); sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath); sourceObject.put(Constants.METADATA_KEY_SOURCE, source); @@ -230,11 +282,28 @@ protected JSONObject getSourceObject(JSONObject metadataObject) { return sourceObject; } + /** + * Provides a {@link Builder} instance for configuring and creating {@code VectorStore} objects. + *

+ * The builder pattern allows for more flexible and readable configuration of a {@code VectorStore}. + * Use this to set parameters such as the store name, configuration, query parameters, and embedding model. + *

+ * + * @return a new {@code Builder} instance. + */ public static Builder builder() { return new Builder(); } + /** + * Builder class for creating instances of {@link VectorStore}. + *

+ * The {@code Builder} class allows you to set various configuration parameters before + * creating a {@code VectorStore} instance. These parameters include the store name, + * configuration settings, query parameters, and embedding model details. + *

+ */ public static class Builder { private String storeName; @@ -247,31 +316,72 @@ public Builder() { } + /** + * Sets the store name for the {@code VectorStore}. + * + * @param storeName the name of the vector store. + * @return the {@code Builder} instance, for method chaining. + */ public Builder storeName(String storeName) { this.storeName = storeName; return this; } + /** + * Sets the configuration for the {@code VectorStore}. + * + * @param configuration the configuration parameters. + * @return the {@code Builder} instance, for method chaining. + */ public Builder configuration(Configuration configuration) { this.configuration = configuration; return this; } + /** + * Sets the query parameters for embedding searches. + * + * @param queryParams the query parameters to use. + * @return the {@code Builder} instance, for method chaining. + */ public Builder queryParams(QueryParameters queryParams) { this.queryParams = queryParams; return this; } + /** + * Sets the parameters for the embedding model. + * + * @param modelParams parameters for selecting and configuring the embedding model. + * @return the {@code Builder} instance, for method chaining. + */ public Builder modelParams(EmbeddingModelNameParameters modelParams) { this.modelParams = modelParams; return this; } + /** + * Sets a pre-configured embedding model for the {@code VectorStore}. + * + * @param embeddingModel the embedding model to use. + * @return the {@code Builder} instance, for method chaining. + */ public Builder embeddingModel(EmbeddingModel embeddingModel) { this.embeddingModel = embeddingModel; return this; } + /** + * Builds and returns a new {@link VectorStore} instance based on the builder's configuration. + *

+ * Depending on the specified configuration, it returns an instance of the appropriate + * store class (e.g., {@link MilvusStore}, {@link PGVectorStore}, or {@link AISearchStore}). + * If no matching store configuration is found, it returns a default {@code VectorStore} instance. + *

+ * + * @return a {@code VectorStore} instance. + * @throws IllegalArgumentException if the configured vector store is unsupported. + */ public VectorStore build() { VectorStore embeddingStore; @@ -293,15 +403,29 @@ public VectorStore build() { embeddingStore = new AISearchStore(storeName, configuration, queryParams, modelParams); break; + case Constants.VECTOR_STORE_WEAVIATE: + + embeddingStore = new WeaviateStore(storeName, configuration, queryParams, modelParams); + break; + case Constants.VECTOR_STORE_CHROMA: + embeddingStore = new ChromaStore(storeName, configuration, queryParams, modelParams); + break; + case Constants.VECTOR_STORE_PINECONE: + embeddingStore = new PineconeStore(storeName, configuration, queryParams, modelParams); + break; + case Constants.VECTOR_STORE_ELASTICSEARCH: - case Constants.VECTOR_STORE_WEAVIATE: + embeddingStore = new ElasticsearchStore(storeName, configuration, queryParams, modelParams); + break; + + case Constants.VECTOR_STORE_OPENSEARCH: - embeddingStore = new VectorStore(storeName, configuration, queryParams, modelParams); + embeddingStore = new OpenSearchStore(storeName, configuration, queryParams, modelParams); break; default: diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/aisearch/AISearchStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java similarity index 70% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/aisearch/AISearchStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java index e04c3c6..2b1cf35 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/aisearch/AISearchStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java @@ -1,4 +1,4 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store.aisearch; +package org.mule.extension.mulechain.vectors.internal.store.aisearch; import org.json.JSONArray; import org.json.JSONObject; @@ -6,7 +6,7 @@ import org.mule.extension.mulechain.vectors.internal.constant.Constants; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import java.io.BufferedReader; @@ -36,12 +36,12 @@ public AISearchStore(String storeName, Configuration configuration, QueryParamet public JSONObject listSources() { - HashMap sourcesJSONObjectHashMap = new HashMap(); + HashMap sourceObjectMap = new HashMap(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); - int segmentCount = 0; // Counter to track the number of documents processed + int segmentCount = 0; // Counter to track the number of segments processed int offset = 0; // Initialize offset for pagination try { @@ -52,7 +52,7 @@ public JSONObject listSources() { do { // Construct the URL with $top and $skip for pagination String urlString = this.url + "/indexes/" + storeName + "/docs?search=*&$top=" + queryParams.embeddingPageSize() + - "&$skip=" + offset + "&$select=id,metadata&api-version=" + API_VERSION; + "&$skip=" + offset + "&$select=id," + Constants.STORE_SCHEMA_METADATA_FIELD_NAME + "&api-version=" + API_VERSION; // Nested loop to handle each page of results while (urlString != null) { @@ -86,7 +86,7 @@ public JSONObject listSources() { JSONObject document = documents.getJSONObject(i); String id = document.getString("id"); // Document ID - JSONObject metadata = document.optJSONObject("metadata"); // Metadata of the document + JSONObject metadata = document.optJSONObject(Constants.STORE_SCHEMA_METADATA_FIELD_NAME); // Metadata of the document if (metadata != null) { @@ -94,36 +94,15 @@ public JSONObject listSources() { JSONArray attributes = metadata.optJSONArray("attributes"); JSONObject metadataObject = new JSONObject(); // Object to store key-value pairs from attributes - - int index =0; // Iterate over attributes array to populate sourceObject for (int j = 0; j < attributes.length(); j++) { + JSONObject attribute = attributes.getJSONObject(j); metadataObject.put(attribute.getString("key"), attribute.get("value")); - if(attribute.getString("key").compareTo(Constants.METADATA_KEY_INDEX) == 0) { - index = Integer.parseInt(metadataObject.getString("index")); - } } JSONObject sourceObject = getSourceObject(metadataObject); - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - // Get current index - int currentSegmentCount = index + 1; - // Get previously stored index - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - // Check if object need to be updated - if(currentSegmentCount > storedSegmentCount) { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } else { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); LOGGER.debug("sourceObject: " + sourceObject); segmentCount++; // Increment document count @@ -153,7 +132,7 @@ public JSONObject listSources() { } while (hasMore); // Continue if more pages are available // Output total count of processed documents - LOGGER.debug("segmentCount: " + segmentCount); + LOGGER.debug(JSON_KEY_SEGMENT_COUNT + ": " + segmentCount); } catch (Exception e) { @@ -161,8 +140,8 @@ public JSONObject listSources() { LOGGER.error("Error while listing sources", e); } - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java new file mode 100644 index 0000000..56d52f6 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java @@ -0,0 +1,177 @@ +package org.mule.extension.mulechain.vectors.internal.store.chroma; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; + +/** + * ChromaStore is a specialized implementation of {@link VectorStore} designed to interact with + * the Chroma database for managing vector data and sources. + */ +public class ChromaStore extends VectorStore { + + private String url; + + + /** + * Initializes a new instance of ChromaStore. + * + * @param storeName the name of the vector store. + * @param configuration the configuration object containing necessary settings. + * @param queryParams parameters related to query configurations. + * @param modelParams parameters for embedding model configurations. + */ + public ChromaStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_ELASTICSEARCH); + this.url = vectorStoreConfig.getString("CHROMA_URL"); + } + + /** + * Retrieves a JSON object listing all sources associated with the store. + * + * @return a {@link JSONObject} containing details of all sources. + */ + public JSONObject listSources() { + + HashMap sourceObjectMap = new HashMap(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); + + long segmentCount = 0; // Counter to track the number of segments processed + long offset = 0; // Initialize offset for pagination + + try { + + String collectionId = getCollectionId(storeName); + segmentCount = getSegmentCount(collectionId); + + while(offset < segmentCount) { + + } + + } catch (Exception e) { + + // Handle any exceptions that occur during the process + LOGGER.error("Error while listing sources", e); + } + + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); + + return jsonObject; + } + + /** + * Retrieves the total number of segments in the specified collection. + * + * @param collectionId the ID of the collection. + * @return the segment count as a {@code long}. + */ + private long getSegmentCount(String collectionId) { + + long segmentCount = 0; + try { + + String urlString = url + "/api/v1/collections/" + collectionId + "/count"; + URL url = new URL(urlString); + + // Open connection and configure HTTP request + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Content-Type", "application/json"); + + // Check the response code and handle accordingly + if (connection.getResponseCode() == 200) { + + BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); + StringBuilder responseBuilder = new StringBuilder(); + String line; + + // Read response line by line + while ((line = in.readLine()) != null) { + responseBuilder.append(line); + } + in.close(); + segmentCount = Long.parseLong(responseBuilder.toString()); + + } else { + + // Log any error responses from the server + LOGGER.error("Error: " + connection.getResponseCode() + " " + connection.getResponseMessage()); + } + + } catch (Exception e) { + + // Handle any exceptions that occur during the process + LOGGER.error("Error getting collection count", e); + } + LOGGER.debug("segmentCount: " + segmentCount); + return segmentCount; + } + + /** + * Retrieves the collection ID for a given store name. + * + * @param storeName the name of the store. + * @return the collection ID as a {@code String}. + */ + private String getCollectionId(String storeName) { + + String collectionId = ""; + try { + + String urlString = url + "/api/v1/collections/" + storeName; + URL url = new URL(urlString); + + // Open connection and configure HTTP request + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Content-Type", "application/json"); + + // Check the response code and handle accordingly + if (connection.getResponseCode() == 200) { + + BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); + StringBuilder responseBuilder = new StringBuilder(); + String line; + + // Read response line by line + while ((line = in.readLine()) != null) { + responseBuilder.append(line); + } + in.close(); + + // Parse JSON response + JSONObject jsonResponse = new JSONObject(responseBuilder.toString()); + collectionId = jsonResponse.getString("id"); + + } else { + + // Log any error responses from the server + LOGGER.error("Error: " + connection.getResponseCode() + " " + connection.getResponseMessage()); + } + + } catch (Exception e) { + + // Handle any exceptions that occur during the process + LOGGER.error("Error getting collection id", e); + } + LOGGER.debug("collectionId: " + collectionId); + return collectionId; + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/elasticsearch/ElasticsearchStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/elasticsearch/ElasticsearchStore.java new file mode 100644 index 0000000..d5a47b9 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/elasticsearch/ElasticsearchStore.java @@ -0,0 +1,27 @@ +package org.mule.extension.mulechain.vectors.internal.store.elasticsearch; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +public class ElasticsearchStore extends VectorStore { + + private String url; + private String userName; + private String password; + + public ElasticsearchStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_ELASTICSEARCH); + this.url = vectorStoreConfig.getString("ELASTICSEARCH_URL"); + this.userName = vectorStoreConfig.getString("ELASTICSEARCH_USER"); + this.password = vectorStoreConfig.getString("ELASTICSEARCH_PASSWORD"); + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java similarity index 62% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java index 3b40428..c6682bb 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java @@ -1,4 +1,4 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store.milvus; +package org.mule.extension.mulechain.vectors.internal.store.milvus; import com.google.gson.JsonObject; import io.milvus.client.MilvusServiceClient; @@ -7,13 +7,12 @@ import io.milvus.param.dml.QueryIteratorParam; import io.milvus.param.R; import io.milvus.response.QueryResultsWrapper; -import org.json.JSONArray; import org.json.JSONObject; import org.mule.extension.mulechain.vectors.internal.config.Configuration; import org.mule.extension.mulechain.vectors.internal.constant.Constants; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import java.util.ArrayList; @@ -38,10 +37,10 @@ public MilvusStore(String storeName, Configuration configuration, QueryParameter public JSONObject listSources() { - HashMap sourcesJSONObjectHashMap = new HashMap(); + HashMap sourceObjectMap = new HashMap(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); // Specify the host and port for the Milvus server ConnectParam connectParam = ConnectParam.newBuilder() @@ -58,7 +57,7 @@ public JSONObject listSources() { QueryIteratorParam iteratorParam = QueryIteratorParam.newBuilder() .withCollectionName(storeName) .withBatchSize((long)queryParams.embeddingPageSize()) - .withOutFields(Arrays.asList("metadata")) + .withOutFields(Arrays.asList(Constants.STORE_SCHEMA_METADATA_FIELD_NAME)) .build(); R queryIteratorRes = client.queryIterator(iteratorParam); @@ -82,31 +81,10 @@ public JSONObject listSources() { for (QueryResultsWrapper.RowRecord rowRecord : batchResults) { - JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get("metadata"); + JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get(Constants.STORE_SCHEMA_METADATA_FIELD_NAME); JSONObject metadataObject = new JSONObject(gsonObject.toString()); - - String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; JSONObject sourceObject = getSourceObject(metadataObject); - - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - // Get current index - int currentSegmentCount = Integer.parseInt(index) + 1; - // Get previously stored index - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - // Check if object need to be updated - if(currentSegmentCount > storedSegmentCount) { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } else { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } - + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); } } } @@ -114,8 +92,8 @@ public JSONObject listSources() { client.close(); } - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/opensearch/OpenSearchStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/opensearch/OpenSearchStore.java new file mode 100644 index 0000000..6753f15 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/opensearch/OpenSearchStore.java @@ -0,0 +1,27 @@ +package org.mule.extension.mulechain.vectors.internal.store.opensearch; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +public class OpenSearchStore extends VectorStore { + + private String url; + private String userName; + private String password; + + public OpenSearchStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_OPENSEARCH); + this.url = vectorStoreConfig.getString("OPENSEARCH_URL"); + this.userName = vectorStoreConfig.getString("OPENSEARCH_USER"); + this.password = vectorStoreConfig.getString("OPENSEARCH_PASSWORD"); + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/pgvector/PGVectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java similarity index 76% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/pgvector/PGVectorStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java index 8b47120..7bcf9f9 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/pgvector/PGVectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java @@ -1,11 +1,11 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store.pgvector; +package org.mule.extension.mulechain.vectors.internal.store.pgvector; import org.json.JSONObject; import org.mule.extension.mulechain.vectors.internal.config.Configuration; import org.mule.extension.mulechain.vectors.internal.constant.Constants; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import org.postgresql.ds.PGSimpleDataSource; import org.slf4j.Logger; @@ -63,44 +63,24 @@ public PGVectorStore(String storeName, Configuration configuration, QueryParamet */ public JSONObject listSources() { - HashMap sourcesJSONObjectHashMap = new HashMap<>(); + HashMap sourceObjectMap = new HashMap<>(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); try (PgVectorMetadataIterator iterator = new PgVectorMetadataIterator(userName, password, host, port, database, storeName, (int)queryParams.embeddingPageSize())) { while (iterator.hasNext()) { JSONObject metadataObject = new JSONObject(iterator.next()); - - String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; JSONObject sourceObject = getSourceObject(metadataObject); - - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - // Get current index - int currentSegmentCount = Integer.parseInt(index) + 1; - // Get previously stored index - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - // Check if object need to be updated - if(currentSegmentCount > storedSegmentCount) { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } else { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); } } catch (SQLException e) { LOGGER.error("Error while listing sources", e); } - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } @@ -108,7 +88,7 @@ public JSONObject listSources() { /** * Iterator to handle metadata pagination from the PostgreSQL database. */ - public class PgVectorMetadataIterator implements Iterator, AutoCloseable { + private class PgVectorMetadataIterator implements Iterator, AutoCloseable { private int offset = 0; // Current offset for pagination private ResultSet resultSet; @@ -129,7 +109,7 @@ public class PgVectorMetadataIterator implements Iterator, AutoCloseable * @param pageSize The number of rows per page for pagination. * @throws SQLException If a database error occurs. */ - public PgVectorMetadataIterator(String userName, String password, String host, int port, String database, String table, int pageSize) throws SQLException { + private PgVectorMetadataIterator(String userName, String password, String host, int port, String database, String table, int pageSize) throws SQLException { // Initialize the connection and the first page of data PGSimpleDataSource dataSource = new PGSimpleDataSource(); @@ -157,7 +137,7 @@ private void fetchNextPage() throws SQLException { pstmt.close(); } - String query = "SELECT metadata FROM " + table + " LIMIT ? OFFSET ?"; + String query = "SELECT " + Constants.STORE_SCHEMA_METADATA_FIELD_NAME + " FROM " + table + " LIMIT ? OFFSET ?"; pstmt = connection.prepareStatement(query); pstmt.setInt(1, this.pageSize); pstmt.setInt(2, offset); @@ -198,7 +178,7 @@ public String next() { if (resultSet == null) { throw new NoSuchElementException("No more elements available"); } - return resultSet.getString("metadata"); + return resultSet.getString(Constants.STORE_SCHEMA_METADATA_FIELD_NAME); } catch (SQLException e) { LOGGER.error("Error retrieving next element", e); throw new NoSuchElementException("Error retrieving next element"); diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pinecone/PineconeStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pinecone/PineconeStore.java new file mode 100644 index 0000000..2712507 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pinecone/PineconeStore.java @@ -0,0 +1,27 @@ +package org.mule.extension.mulechain.vectors.internal.store.pinecone; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +public class PineconeStore extends VectorStore { + + private String apiKey; + private String cloud; + private String cloudRegion; + + public PineconeStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_PINECONE); + this.apiKey = vectorStoreConfig.getString("PINECONE_APIKEY"); + this.cloud = vectorStoreConfig.getString("PINECONE_SERVERLESS_CLOUD"); + this.cloudRegion = vectorStoreConfig.getString("PINECONE_SERVERLESS_REGION"); + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java new file mode 100644 index 0000000..cc9ee8c --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java @@ -0,0 +1,28 @@ +package org.mule.extension.mulechain.vectors.internal.store.weviate; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; + +import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile; + +public class WeaviateStore extends VectorStore { + + private String host; + private String protocol; + private String apiKey; + + public WeaviateStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_WEAVIATE); + this.host = vectorStoreConfig.getString("WEAVIATE_HOST"); + this.protocol = vectorStoreConfig.getString("WEAVIATE_PROTOCOL"); + this.apiKey = vectorStoreConfig.getString("WEAVIATE_APIKEY"); + } +}