Skip to content

Commit

Permalink
Merge pull request #25 from MuleSoft-AI-Chain-Project/bugfix/list-sou…
Browse files Browse the repository at this point in the history
…rces

Bugfix/list sources
  • Loading branch information
tbolis-at-mulesoft authored Nov 10, 2024
2 parents 1d3005a + aabf6b1 commit b09f4f7
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.mule.mulechain</groupId>
<artifactId>mulechain-vectors</artifactId>
<version>0.1.85-SNAPSHOT</version>
<version>0.1.86-SNAPSHOT</version>
<packaging>mule-extension</packaging>
<name>MAC Vectors</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class EmbeddingOperationValidator {

EMBEDDING_OPERATION_TYPE_TO_SUPPORTED_VECTOR_STORES.put(Constants.EMBEDDING_OPERATION_TYPE_QUERY_ALL,
new HashSet<>(Arrays.asList(
// Constants.VECTOR_STORE_PGVECTOR, // Needs to be tested
Constants.VECTOR_STORE_PGVECTOR,
// Constants.VECTOR_STORE_ELASTICSEARCH, // Needs to be tested
Constants.VECTOR_STORE_MILVUS
// Constants.VECTOR_STORE_CHROMA, // Needs to be tested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters;
import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters;
import org.mule.extension.mulechain.vectors.internal.helper.store.milvus.MilvusVectorStore;
import org.mule.extension.mulechain.vectors.internal.helper.store.milvus.PGVectorVectorStore;
import org.mule.extension.mulechain.vectors.internal.util.JsonUtils;
import org.mule.runtime.module.extension.internal.runtime.operation.IllegalOperationException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -203,7 +204,30 @@ protected String getSourceUniqueKey(JSONObject sourceObject) {
return !sourceId.isEmpty() ? sourceId : alternativeKey;
}

protected JSONObject getSourceObject(JSONObject metadataObject) {

String sourceId = metadataObject.has(Constants.METADATA_KEY_SOURCE_ID) ? metadataObject.getString(Constants.METADATA_KEY_SOURCE_ID) : null;
String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null;
String fileName = metadataObject.has(Constants.METADATA_KEY_FILE_NAME) ? metadataObject.getString(Constants.METADATA_KEY_FILE_NAME) : null;
String url = metadataObject.has(Constants.METADATA_KEY_URL) ? metadataObject.getString(Constants.METADATA_KEY_URL) : null;
String fullPath = metadataObject.has(Constants.METADATA_KEY_FULL_PATH) ? metadataObject.getString(Constants.METADATA_KEY_FULL_PATH) : null;
String absoluteDirectoryPath = metadataObject.has(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) ? metadataObject.getString(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) : null;
String ingestionDatetime = metadataObject.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? metadataObject.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : null;

JSONObject sourceObject = new JSONObject();
sourceObject.put("segmentCount", Integer.parseInt(index) + 1);
sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId);
sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath);
sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath);
sourceObject.put(Constants.METADATA_KEY_FILE_NAME, fileName);
sourceObject.put(Constants.METADATA_KEY_URL, url);
sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime);

return sourceObject;
}

public static Builder builder() {

return new Builder();
}

Expand Down Expand Up @@ -255,6 +279,11 @@ public VectorStore build() {
embeddingStore = new MilvusVectorStore(storeName, configuration, queryParams, modelParams);
break;

case Constants.VECTOR_STORE_PGVECTOR:

embeddingStore = new PGVectorVectorStore(storeName, configuration, queryParams, modelParams);
break;

case Constants.VECTOR_STORE_AI_SEARCH:

case Constants.VECTOR_STORE_CHROMA:
Expand All @@ -263,8 +292,6 @@ public VectorStore build() {

case Constants.VECTOR_STORE_ELASTICSEARCH:

case Constants.VECTOR_STORE_PGVECTOR:

case Constants.VECTOR_STORE_WEAVIATE:

embeddingStore = new VectorStore(storeName, configuration, queryParams, modelParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,13 @@ public JSONObject listSources() {
hasMore = false;
} else {

LOGGER.debug("Number of segments in current batch: " + batchResults.size());
for (QueryResultsWrapper.RowRecord rowRecord : batchResults) {

JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get("metadata");
LOGGER.debug(gsonObject.toString());
JSONObject metadata = new JSONObject(gsonObject.toString());

String sourceId = metadata.has(Constants.METADATA_KEY_SOURCE_ID) ? metadata.getString(Constants.METADATA_KEY_SOURCE_ID) : null;
String index = metadata.has(Constants.METADATA_KEY_INDEX) ? metadata.getString(Constants.METADATA_KEY_INDEX) : null;
String fileName = metadata.has(Constants.METADATA_KEY_FILE_NAME) ? metadata.getString(Constants.METADATA_KEY_FILE_NAME) : null;
String url = metadata.has(Constants.METADATA_KEY_URL) ? metadata.getString(Constants.METADATA_KEY_URL) : null;
String fullPath = metadata.has(Constants.METADATA_KEY_FULL_PATH) ? metadata.getString(Constants.METADATA_KEY_FULL_PATH) : null;
String absoluteDirectoryPath = metadata.has(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) ? metadata.getString(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) : null;
String ingestionDatetime = metadata.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? metadata.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : null;

JSONObject sourceObject = new JSONObject();
sourceObject.put("segmentCount", Integer.parseInt(index) + 1);
sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId);
sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath);
sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath);
sourceObject.put(Constants.METADATA_KEY_FILE_NAME, fileName);
sourceObject.put(Constants.METADATA_KEY_URL, url);
sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime);
JSONObject metadataObject = new JSONObject(gsonObject.toString());

String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null;
JSONObject sourceObject = getSourceObject(metadataObject);

String sourceUniqueKey = getSourceUniqueKey(sourceObject);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package org.mule.extension.mulechain.vectors.internal.helper.store.milvus;

import org.json.JSONArray;
import org.json.JSONObject;
import org.mule.extension.mulechain.vectors.internal.config.Configuration;
import org.mule.extension.mulechain.vectors.internal.constant.Constants;
import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters;
import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters;
import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore;
import org.mule.extension.mulechain.vectors.internal.util.JsonUtils;
import org.postgresql.ds.PGSimpleDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.NoSuchElementException;

import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile;

/**
* Represents a store for vector data using PostgreSQL with PGVector extension.
* This class is responsible for interacting with a PostgreSQL database to store and retrieve vector metadata.
*/
public class PGVectorVectorStore extends VectorStore {

private static final Logger LOGGER = LoggerFactory.getLogger(PGVectorVectorStore.class);

private String userName;
private String password;
private String host;
private int port;
private String database;

/**
* Constructs a PGVectorVectorStore instance using configuration and query parameters.
*
* @param storeName The name of the store.
* @param configuration The configuration for connecting to the store.
* @param queryParams Parameters related to query configurations.
* @param modelParams Parameters related to embedding model.
*/
public PGVectorVectorStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) {

super(storeName, configuration, queryParams, modelParams);

JSONObject config = readConfigFile(configuration.getConfigFilePath());
JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_PGVECTOR);
this.host = vectorStoreConfig.getString("POSTGRES_HOST");
this.port = vectorStoreConfig.getInt("POSTGRES_PORT");
this.database = vectorStoreConfig.getString("POSTGRES_DATABASE");
this.userName = vectorStoreConfig.getString("POSTGRES_USER");
this.password = vectorStoreConfig.getString("POSTGRES_PASSWORD");
}

/**
* Lists the sources stored in the PostgreSQL database.
*
* @return A {@link JSONObject} containing the sources and their metadata.
*/
public JSONObject listSources() {

HashMap<String, JSONObject> sourcesJSONObjectHashMap = new HashMap<>();

JSONObject jsonObject = new JSONObject();
jsonObject.put("storeName", storeName);

try (PgVectorMetadataIterator iterator = new PgVectorMetadataIterator(userName, password, host, port, database, storeName, (int)queryParams.embeddingPageSize())) {
while (iterator.hasNext()) {

JSONObject metadataObject = new JSONObject(iterator.next());

String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null;
JSONObject sourceObject = getSourceObject(metadataObject);

String sourceUniqueKey = getSourceUniqueKey(sourceObject);

// Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key
if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) {
// Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments)
if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){
// Get current index
int currentSegmentCount = Integer.parseInt(index) + 1;
// Get previously stored index
int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount");
// Check if object need to be updated
if(currentSegmentCount > storedSegmentCount) {
sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject);
}
} else {
sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject);
}
}
}
} catch (SQLException e) {
LOGGER.error("Error while listing sources", e);
}

jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values()));
jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size());

return jsonObject;
}

/**
* Iterator to handle metadata pagination from the PostgreSQL database.
*/
public class PgVectorMetadataIterator implements Iterator<String>, AutoCloseable {

private int offset = 0; // Current offset for pagination
private ResultSet resultSet;
private PreparedStatement pstmt;
private Connection connection;
private String table;
int pageSize;

/**
* Constructs a PgVectorMetadataIterator for fetching metadata from the database in pages.
*
* @param userName The username for database access.
* @param password The password for database access.
* @param host The PostgreSQL host.
* @param port The PostgreSQL port.
* @param database The name of the database.
* @param table The table to fetch metadata from.
* @param pageSize The number of rows per page for pagination.
* @throws SQLException If a database error occurs.
*/
public PgVectorMetadataIterator(String userName, String password, String host, int port, String database, String table, int pageSize) throws SQLException {

// Initialize the connection and the first page of data
PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setServerNames(new String[]{host});
dataSource.setPortNumbers(new int[]{port});
dataSource.setDatabaseName(database);
dataSource.setUser(userName);
dataSource.setPassword(password);

connection = dataSource.getConnection();

this.table = table;
this.pageSize = pageSize;

fetchNextPage();
}

/**
* Fetches the next page of metadata from the database.
*
* @throws SQLException If a database error occurs.
*/
private void fetchNextPage() throws SQLException {
if (pstmt != null) {
pstmt.close();
}

String query = "SELECT metadata FROM " + table + " LIMIT ? OFFSET ?";
pstmt = connection.prepareStatement(query);
pstmt.setInt(1, this.pageSize);
pstmt.setInt(2, offset);
resultSet = pstmt.executeQuery();
offset += pageSize;
}

/**
* Checks if there are more elements in the result set.
*
* @return {@code true} if there are more elements, {@code false} otherwise.
*/
@Override
public boolean hasNext() {
try {
// Check if the resultSet has more rows or fetch the next page
if (resultSet != null && resultSet.next()) {
return true;
} else {
fetchNextPage(); // Fetch the next page if the current page is exhausted
return resultSet != null && resultSet.next();
}
} catch (SQLException e) {
LOGGER.error("Error checking for next element", e);
return false;
}
}

/**
* Returns the next metadata element in the result set.
*
* @return The next metadata element as a {@code String}.
* @throws NoSuchElementException If no more elements are available.
*/
@Override
public String next() {
try {
if (resultSet == null) {
throw new NoSuchElementException("No more elements available");
}
return resultSet.getString("metadata");
} catch (SQLException e) {
LOGGER.error("Error retrieving next element", e);
throw new NoSuchElementException("Error retrieving next element");
}
}

/**
* Closes the iterator and releases the resources.
*/
public void close() {
try {
if (resultSet != null)
resultSet.close();
if (pstmt != null)
pstmt.close();
if (connection != null)
connection.close();
} catch (SQLException e) {
LOGGER.error("Error closing resources", e);
}
}
}
}

0 comments on commit b09f4f7

Please sign in to comment.