diff --git a/pom.xml b/pom.xml index 6a3612b..a78f4d6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.mule.mulechain mulechain-vectors - 0.1.85-SNAPSHOT + 0.1.86-SNAPSHOT mule-extension MAC Vectors diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/EmbeddingOperationValidator.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/EmbeddingOperationValidator.java index 362d340..64128d6 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/EmbeddingOperationValidator.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/EmbeddingOperationValidator.java @@ -74,7 +74,7 @@ public class EmbeddingOperationValidator { EMBEDDING_OPERATION_TYPE_TO_SUPPORTED_VECTOR_STORES.put(Constants.EMBEDDING_OPERATION_TYPE_QUERY_ALL, new HashSet<>(Arrays.asList( - // Constants.VECTOR_STORE_PGVECTOR, // Needs to be tested + Constants.VECTOR_STORE_PGVECTOR, // Constants.VECTOR_STORE_ELASTICSEARCH, // Needs to be tested Constants.VECTOR_STORE_MILVUS // Constants.VECTOR_STORE_CHROMA, // Needs to be tested diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java index 2a029cd..56d6390 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java @@ -17,6 +17,7 @@ import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; import org.mule.extension.mulechain.vectors.internal.helper.store.milvus.MilvusVectorStore; +import org.mule.extension.mulechain.vectors.internal.helper.store.milvus.PGVectorVectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import org.mule.runtime.module.extension.internal.runtime.operation.IllegalOperationException; import org.slf4j.Logger; @@ -203,7 +204,30 @@ protected String getSourceUniqueKey(JSONObject sourceObject) { return !sourceId.isEmpty() ? sourceId : alternativeKey; } + protected JSONObject getSourceObject(JSONObject metadataObject) { + + String sourceId = metadataObject.has(Constants.METADATA_KEY_SOURCE_ID) ? metadataObject.getString(Constants.METADATA_KEY_SOURCE_ID) : null; + String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; + String fileName = metadataObject.has(Constants.METADATA_KEY_FILE_NAME) ? metadataObject.getString(Constants.METADATA_KEY_FILE_NAME) : null; + String url = metadataObject.has(Constants.METADATA_KEY_URL) ? metadataObject.getString(Constants.METADATA_KEY_URL) : null; + String fullPath = metadataObject.has(Constants.METADATA_KEY_FULL_PATH) ? metadataObject.getString(Constants.METADATA_KEY_FULL_PATH) : null; + String absoluteDirectoryPath = metadataObject.has(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) ? metadataObject.getString(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) : null; + String ingestionDatetime = metadataObject.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? metadataObject.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : null; + + JSONObject sourceObject = new JSONObject(); + sourceObject.put("segmentCount", Integer.parseInt(index) + 1); + sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId); + sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath); + sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath); + sourceObject.put(Constants.METADATA_KEY_FILE_NAME, fileName); + sourceObject.put(Constants.METADATA_KEY_URL, url); + sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime); + + return sourceObject; + } + public static Builder builder() { + return new Builder(); } @@ -255,6 +279,11 @@ public VectorStore build() { embeddingStore = new MilvusVectorStore(storeName, configuration, queryParams, modelParams); break; + case Constants.VECTOR_STORE_PGVECTOR: + + embeddingStore = new PGVectorVectorStore(storeName, configuration, queryParams, modelParams); + break; + case Constants.VECTOR_STORE_AI_SEARCH: case Constants.VECTOR_STORE_CHROMA: @@ -263,8 +292,6 @@ public VectorStore build() { case Constants.VECTOR_STORE_ELASTICSEARCH: - case Constants.VECTOR_STORE_PGVECTOR: - case Constants.VECTOR_STORE_WEAVIATE: embeddingStore = new VectorStore(storeName, configuration, queryParams, modelParams); diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusVectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusVectorStore.java index ee10aa2..f2b2341 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusVectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusVectorStore.java @@ -84,29 +84,13 @@ public JSONObject listSources() { hasMore = false; } else { - LOGGER.debug("Number of segments in current batch: " + batchResults.size()); for (QueryResultsWrapper.RowRecord rowRecord : batchResults) { JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get("metadata"); - LOGGER.debug(gsonObject.toString()); - JSONObject metadata = new JSONObject(gsonObject.toString()); - - String sourceId = metadata.has(Constants.METADATA_KEY_SOURCE_ID) ? metadata.getString(Constants.METADATA_KEY_SOURCE_ID) : null; - String index = metadata.has(Constants.METADATA_KEY_INDEX) ? metadata.getString(Constants.METADATA_KEY_INDEX) : null; - String fileName = metadata.has(Constants.METADATA_KEY_FILE_NAME) ? metadata.getString(Constants.METADATA_KEY_FILE_NAME) : null; - String url = metadata.has(Constants.METADATA_KEY_URL) ? metadata.getString(Constants.METADATA_KEY_URL) : null; - String fullPath = metadata.has(Constants.METADATA_KEY_FULL_PATH) ? metadata.getString(Constants.METADATA_KEY_FULL_PATH) : null; - String absoluteDirectoryPath = metadata.has(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) ? metadata.getString(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) : null; - String ingestionDatetime = metadata.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? metadata.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : null; - - JSONObject sourceObject = new JSONObject(); - sourceObject.put("segmentCount", Integer.parseInt(index) + 1); - sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId); - sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath); - sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath); - sourceObject.put(Constants.METADATA_KEY_FILE_NAME, fileName); - sourceObject.put(Constants.METADATA_KEY_URL, url); - sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime); + JSONObject metadataObject = new JSONObject(gsonObject.toString()); + + String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; + JSONObject sourceObject = getSourceObject(metadataObject); String sourceUniqueKey = getSourceUniqueKey(sourceObject); diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/PGVectorVectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/PGVectorVectorStore.java new file mode 100644 index 0000000..471089e --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/PGVectorVectorStore.java @@ -0,0 +1,225 @@ +package org.mule.extension.mulechain.vectors.internal.helper.store.milvus; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; +import org.postgresql.ds.PGSimpleDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile; + +/** + * Represents a store for vector data using PostgreSQL with PGVector extension. + * This class is responsible for interacting with a PostgreSQL database to store and retrieve vector metadata. + */ +public class PGVectorVectorStore extends VectorStore { + + private static final Logger LOGGER = LoggerFactory.getLogger(PGVectorVectorStore.class); + + private String userName; + private String password; + private String host; + private int port; + private String database; + + /** + * Constructs a PGVectorVectorStore instance using configuration and query parameters. + * + * @param storeName The name of the store. + * @param configuration The configuration for connecting to the store. + * @param queryParams Parameters related to query configurations. + * @param modelParams Parameters related to embedding model. + */ + public PGVectorVectorStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_PGVECTOR); + this.host = vectorStoreConfig.getString("POSTGRES_HOST"); + this.port = vectorStoreConfig.getInt("POSTGRES_PORT"); + this.database = vectorStoreConfig.getString("POSTGRES_DATABASE"); + this.userName = vectorStoreConfig.getString("POSTGRES_USER"); + this.password = vectorStoreConfig.getString("POSTGRES_PASSWORD"); + } + + /** + * Lists the sources stored in the PostgreSQL database. + * + * @return A {@link JSONObject} containing the sources and their metadata. + */ + public JSONObject listSources() { + + HashMap sourcesJSONObjectHashMap = new HashMap<>(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("storeName", storeName); + + try (PgVectorMetadataIterator iterator = new PgVectorMetadataIterator(userName, password, host, port, database, storeName, (int)queryParams.embeddingPageSize())) { + while (iterator.hasNext()) { + + JSONObject metadataObject = new JSONObject(iterator.next()); + + String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; + JSONObject sourceObject = getSourceObject(metadataObject); + + String sourceUniqueKey = getSourceUniqueKey(sourceObject); + + // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key + if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { + // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) + if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ + // Get current index + int currentSegmentCount = Integer.parseInt(index) + 1; + // Get previously stored index + int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); + // Check if object need to be updated + if(currentSegmentCount > storedSegmentCount) { + sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); + } + } else { + sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); + } + } + } + } catch (SQLException e) { + LOGGER.error("Error while listing sources", e); + } + + jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); + jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + + return jsonObject; + } + + /** + * Iterator to handle metadata pagination from the PostgreSQL database. + */ + public class PgVectorMetadataIterator implements Iterator, AutoCloseable { + + private int offset = 0; // Current offset for pagination + private ResultSet resultSet; + private PreparedStatement pstmt; + private Connection connection; + private String table; + int pageSize; + + /** + * Constructs a PgVectorMetadataIterator for fetching metadata from the database in pages. + * + * @param userName The username for database access. + * @param password The password for database access. + * @param host The PostgreSQL host. + * @param port The PostgreSQL port. + * @param database The name of the database. + * @param table The table to fetch metadata from. + * @param pageSize The number of rows per page for pagination. + * @throws SQLException If a database error occurs. + */ + public PgVectorMetadataIterator(String userName, String password, String host, int port, String database, String table, int pageSize) throws SQLException { + + // Initialize the connection and the first page of data + PGSimpleDataSource dataSource = new PGSimpleDataSource(); + dataSource.setServerNames(new String[]{host}); + dataSource.setPortNumbers(new int[]{port}); + dataSource.setDatabaseName(database); + dataSource.setUser(userName); + dataSource.setPassword(password); + + connection = dataSource.getConnection(); + + this.table = table; + this.pageSize = pageSize; + + fetchNextPage(); + } + + /** + * Fetches the next page of metadata from the database. + * + * @throws SQLException If a database error occurs. + */ + private void fetchNextPage() throws SQLException { + if (pstmt != null) { + pstmt.close(); + } + + String query = "SELECT metadata FROM " + table + " LIMIT ? OFFSET ?"; + pstmt = connection.prepareStatement(query); + pstmt.setInt(1, this.pageSize); + pstmt.setInt(2, offset); + resultSet = pstmt.executeQuery(); + offset += pageSize; + } + + /** + * Checks if there are more elements in the result set. + * + * @return {@code true} if there are more elements, {@code false} otherwise. + */ + @Override + public boolean hasNext() { + try { + // Check if the resultSet has more rows or fetch the next page + if (resultSet != null && resultSet.next()) { + return true; + } else { + fetchNextPage(); // Fetch the next page if the current page is exhausted + return resultSet != null && resultSet.next(); + } + } catch (SQLException e) { + LOGGER.error("Error checking for next element", e); + return false; + } + } + + /** + * Returns the next metadata element in the result set. + * + * @return The next metadata element as a {@code String}. + * @throws NoSuchElementException If no more elements are available. + */ + @Override + public String next() { + try { + if (resultSet == null) { + throw new NoSuchElementException("No more elements available"); + } + return resultSet.getString("metadata"); + } catch (SQLException e) { + LOGGER.error("Error retrieving next element", e); + throw new NoSuchElementException("Error retrieving next element"); + } + } + + /** + * Closes the iterator and releases the resources. + */ + public void close() { + try { + if (resultSet != null) + resultSet.close(); + if (pstmt != null) + pstmt.close(); + if (connection != null) + connection.close(); + } catch (SQLException e) { + LOGGER.error("Error closing resources", e); + } + } + } +}