Skip to content

Commit

Permalink
Add benchmark support for vector radial search
Browse files Browse the repository at this point in the history
Signed-off-by: Junqiu Lei <[email protected]>
  • Loading branch information
junqiu-lei committed Jun 12, 2024
1 parent c29fb2f commit 468ff45
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 24 deletions.
8 changes: 8 additions & 0 deletions osbenchmark/utils/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Context(Enum):
INDEX = 1
QUERY = 2
NEIGHBORS = 3
MAX_DISTANCE_NEIGHBORS = 4
MIN_SCORE_NEIGHBORS = 5


class DataSet(ABC):
Expand Down Expand Up @@ -141,6 +143,12 @@ def parse_context(context: Context) -> str:
if context == Context.QUERY:
return "test"

if context == Context.MAX_DISTANCE_NEIGHBORS:
return "max_distance_neighbors"

if context == Context.MIN_SCORE_NEIGHBORS:
return "min_score_neighbors"

raise Exception("Unsupported context")


Expand Down
48 changes: 35 additions & 13 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,10 @@ async def _vector_search_query_with_recall(opensearch, params):
"success": True,
"recall@k": 0,
"recall@1": 0,
"recall@min_score": 0,
"recall@min_score_1": 0,
"recall@max_distance": 0,
"recall@max_distance_1": 0,
}

def _is_empty_search_results(content):
Expand All @@ -1080,23 +1084,33 @@ def _get_field_value(content, field_name):
return _get_field_value(content["_source"], field_name)
return None

def calculate_recall(predictions, neighbors, top_k):
def calculate_recall(predictions, neighbors, top_1_recall=False):
"""
Calculates the recall by comparing top_k neighbors with predictions.
recall = Sum of matched neighbors from predictions / total number of neighbors from ground truth
Args:
predictions: list containing ids of results returned by OpenSearch.
neighbors: list containing ids of the actual neighbors for a set of queries
top_k: number of top results to check from the neighbors and should be greater than zero
top_1_recall: boolean to calculate recall@1
Returns:
Recall between predictions and top k neighbors from ground truth
"""
correct = 0.0
if neighbors is None:
try:
n = neighbors.index('-1')
# Slice the list to have a length of n
truth_set = neighbors[:n]
except ValueError:
# If '-1' is not found in the list, use the entire list
truth_set = neighbors
min_num_of_results = len(truth_set)
if min_num_of_results == 0:
self.logger.info("No neighbors are provided for recall calculation")
return 0.0
min_num_of_results = min(top_k, len(neighbors))
truth_set = neighbors[:min_num_of_results]
return 1

if top_1_recall:
min_num_of_results = 1

for j in range(min_num_of_results):
if j >= len(predictions):
self.logger.info("No more neighbors in prediction to compare against ground truth.\n"
Expand All @@ -1106,10 +1120,11 @@ def calculate_recall(predictions, neighbors, top_k):
if predictions[j] in truth_set:
correct += 1.0

return correct / min_num_of_results
return float(correct) / min_num_of_results

doc_type = params.get("type")
response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers)

recall_processing_start = time.perf_counter()
if detailed_results:
props = parse(response, ["hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"])
Expand Down Expand Up @@ -1137,12 +1152,19 @@ def calculate_recall(predictions, neighbors, top_k):
continue
candidates.append(field_value)
neighbors_dataset = params["neighbors"]
num_neighbors = params.get("k", 1)
recall_k = calculate_recall(candidates, neighbors_dataset, num_neighbors)
result.update({"recall@k": recall_k})

recall_1 = calculate_recall(candidates, neighbors_dataset, 1)
result.update({"recall@1": recall_1})
recall_threshold = calculate_recall(candidates, neighbors_dataset)
recall_top_1 = calculate_recall(candidates, neighbors_dataset, True)
max_distance = params.get("max_distance")
min_score = params.get("min_score")
if min_score:
result.update({"recall@min_score": recall_threshold})
result.update({"recall@min_score_1": recall_top_1})
elif max_distance:
result.update({"recall@max_distance": recall_threshold})
result.update({"recall@max_distance_1": recall_top_1})
else:
result.update({"recall@k": recall_threshold})
result.update({"recall@1": recall_top_1})

recall_processing_end = time.perf_counter()
recall_processing_time = convert.seconds_to_ms(recall_processing_end - recall_processing_start)
Expand Down
84 changes: 73 additions & 11 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from osbenchmark import exceptions
from osbenchmark.utils import io
from osbenchmark.utils.dataset import DataSet, get_data_set, Context
from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter
from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter, parse_float_parameter
from osbenchmark.workload import workload

__PARAM_SOURCES_BY_OP = {}
Expand Down Expand Up @@ -1027,6 +1027,8 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource):
request-params: query parameters that can be passed to search request
"""
PARAMS_NAME_K = "k"
PARAMS_NAME_MAX_DISTANCE = "max_distance"
PARAMS_NAME_MIN_SCORE = "min_score"
PARAMS_NAME_BODY = "body"
PARAMS_NAME_SIZE = "size"
PARAMS_NAME_QUERY = "query"
Expand All @@ -1041,11 +1043,26 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource):
PARAMS_NAME_REQUEST_PARAMS = "request-params"
PARAMS_NAME_SOURCE = "_source"
PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results"
MIN_SCORE_QUERY_TYPE = "min_score"
MAX_DISTANCE_QUERY_TYPE = "max_distance"
KNN_QUERY_TYPE = "knn"

def __init__(self, workloads, params, query_params, **kwargs):
super().__init__(workloads, params, Context.QUERY, **kwargs)
self.logger = logging.getLogger(__name__)
self.k = parse_int_parameter(self.PARAMS_NAME_K, params)
self.k = None
self.distance = None
self.score = None
if self.PARAMS_NAME_K in params:
self.k = parse_int_parameter(self.PARAMS_NAME_K, params)
self.query_type = self.KNN_QUERY_TYPE
if self.PARAMS_NAME_MAX_DISTANCE in params:
self.distance = parse_float_parameter(self.PARAMS_NAME_MAX_DISTANCE, params)
self.query_type = self.MAX_DISTANCE_QUERY_TYPE
if self.PARAMS_NAME_MIN_SCORE in params:
self.score = parse_float_parameter(self.PARAMS_NAME_MIN_SCORE, params)
self.query_type = self.MIN_SCORE_QUERY_TYPE
self.logger.info("query type is set up to %s", self.query_type)
self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1)
self.current_rep = 1
self.neighbors_data_set_format = parse_string_parameter(
Expand All @@ -1058,10 +1075,21 @@ def __init__(self, workloads, params, query_params, **kwargs):
self.PARAMS_VALUE_VECTOR_SEARCH)
self.query_params = query_params
self.query_params.update({
self.PARAMS_NAME_K: self.k,
self.PARAMS_NAME_OPERATION_TYPE: operation_type,
self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME),
})
if self.PARAMS_NAME_K in params:
self.query_params.update({
self.PARAMS_NAME_K: self.k
})
if self.PARAMS_NAME_MAX_DISTANCE in params:
self.query_params.update({
self.PARAMS_NAME_MAX_DISTANCE: self.distance
})
if self.PARAMS_NAME_MIN_SCORE in params:
self.query_params.update({
self.PARAMS_NAME_MIN_SCORE: self.score
})
if self.PARAMS_NAME_FILTER in params:
self.query_params.update({
self.PARAMS_NAME_FILTER: params.get(self.PARAMS_NAME_FILTER)
Expand All @@ -1086,15 +1114,29 @@ def _update_request_params(self):
self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false")
self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params})

def _get_query_neighbors(self):
if self.query_type == self.KNN_QUERY_TYPE:
return Context.NEIGHBORS
elif self.query_type == self.MIN_SCORE_QUERY_TYPE:
return Context.MIN_SCORE_NEIGHBORS
elif self.query_type == self.MAX_DISTANCE_QUERY_TYPE:
return Context.MAX_DISTANCE_NEIGHBORS
else:
raise exceptions.InvalidSyntax("Unknown query type [%s]" % self.query_type)

def _update_body_params(self, vector):
# accept body params if passed from workload, else, create empty dictionary
body_params = self.query_params.get(self.PARAMS_NAME_BODY) or dict()
if self.PARAMS_NAME_SIZE not in body_params:
body_params[self.PARAMS_NAME_SIZE] = self.k
if self.query_type == self.KNN_QUERY_TYPE:
body_params[self.PARAMS_NAME_SIZE] = self.k
else:
# if distance is set, set size to 10000, which is the maximum number results returned by default
body_params[self.PARAMS_NAME_SIZE] = 10000
if self.PARAMS_NAME_QUERY in body_params:
self.logger.warning(
"[%s] param from body will be replaced with vector search query.", self.PARAMS_NAME_QUERY)
efficient_filter=self.query_params.get(self.PARAMS_NAME_FILTER)
efficient_filter = self.query_params.get(self.PARAMS_NAME_FILTER)
# override query params with vector search query
body_params[self.PARAMS_NAME_QUERY] = self._build_vector_search_query_body(vector, efficient_filter)
self.query_params.update({self.PARAMS_NAME_BODY: body_params})
Expand All @@ -1110,7 +1152,7 @@ def partition(self, partition_index, total_partitions):
self.neighbors_data_set_path = self.data_set_path
# add neighbor instance to partition
partition.neighbors_data_set = get_data_set(
self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS)
self.neighbors_data_set_format, self.neighbors_data_set_path, self._get_query_neighbors())
partition.neighbors_data_set.seek(partition.offset)
return partition

Expand All @@ -1129,7 +1171,10 @@ def params(self):
raise StopIteration
vector = self.data_set.read(1)[0]
neighbor = self.neighbors_data_set.read(1)[0]
true_neighbors = list(map(str, neighbor[:self.k]))
if self.k:
true_neighbors = list(map(str, neighbor[:self.k]))
else:
true_neighbors = list(map(str, neighbor))
self.query_params.update({
"neighbors": true_neighbors,
})
Expand All @@ -1140,17 +1185,34 @@ def params(self):
return self.query_params

def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict:
"""Builds a k-NN request that can be used to execute an approximate nearest
"""Builds a vector search request that can be used to execute an approximate nearest
neighbor search against a k-NN plugin index
Args:
vector: vector used for query
efficient_filter: efficient filter used for query
Returns:
A dictionary containing the body used for search query
"""
query = {
query = {}
if self.query_type == self.MAX_DISTANCE_QUERY_TYPE:
query.update({
"max_distance": self.distance,
})
elif self.query_type == self.MIN_SCORE_QUERY_TYPE:
query.update({
"min_score": self.score,
})
elif self.query_type == self.KNN_QUERY_TYPE:
query.update({
"k": self.k,
})
else:
raise exceptions.InvalidSyntax("Unknown query type [%s]" % self.query_type)

query.update({
"vector": vector,
"k": self.k,
}
})

if efficient_filter:
query.update({
"filter": efficient_filter,
Expand Down
Loading

0 comments on commit 468ff45

Please sign in to comment.