diff --git a/ann_benchmarks/algorithms/tsvector/config.yml b/ann_benchmarks/algorithms/tsvector/config.yml index 5871aa7ac..fc9d90dc4 100644 --- a/ann_benchmarks/algorithms/tsvector/config.yml +++ b/ann_benchmarks/algorithms/tsvector/config.yml @@ -16,7 +16,7 @@ float: [1], # use_bq [0], # pq_vector_length ] - query_args: [[75]] # query_search_list_size + query_args: [[75, 150, 200, 300, 400, 500, 600, 700, 800], [75, 150, 200, 300, 400]] # query_search_list_size #tsvector2: # args: [[50], [10], [1.2]] # query_args: [[10, 50, 50]] diff --git a/ann_benchmarks/algorithms/tsvector/module.py b/ann_benchmarks/algorithms/tsvector/module.py index 7f300d5f7..6c8e6a7d9 100644 --- a/ann_benchmarks/algorithms/tsvector/module.py +++ b/ann_benchmarks/algorithms/tsvector/module.py @@ -19,7 +19,6 @@ #QUERY = """with x as materialized (select id, embedding <=> %s as distance from public.items order by 2 limit 100) select id from x order by distance limit %s""" CONNECTION_SETTINGS = [ - "set tsv.query_rescore = 25;", "set work_mem = '2GB';", "set maintenance_work_mem = '8GB';" "set max_parallel_workers_per_gather = 0;", @@ -49,6 +48,8 @@ def __init__(self, metric: str, connection_str: str, num_neighbors: int, search_ self._use_bq: bool = (use_bq == 1) self._pq_vector_length: int = pq_vector_length self._query_search_list_size: Optional[int] = None + self._query_rescore: Optional[int] = None + self._query_shared_buffers = 0; self._pool : ConnectionPool = None if metric == "angular": self._query: str = QUERY @@ -74,6 +75,9 @@ def configure(conn): if self._query_search_list_size is not None: conn.execute("set tsv.query_search_list_size = %d" % self._query_search_list_size) print("set tsv.query_search_list_size = %d" % self._query_search_list_size) + if self._query_rescore is not None: + conn.execute("set tsv.query_rescore = %d" % self._query_rescore) + print("set tsv.query_rescore = %d" % self._query_rescore) for setting in CONNECTION_SETTINGS: conn.execute(setting) conn.commit() @@ -85,6 +89,24 @@ def does_table_exist(self, conn: psycopg.Connection) -> bool: cur.execute("select count(*) from pg_class where relname = 'items'") table_count = cur.fetchone()[0] return table_count > 0 + + def shared_buffers(self, conn: psycopg.Connection) -> bool: + shared_buffers = 0 + with conn.cursor() as cur: + sql_query = QUERY % ("$1", "%2") + cur.execute(f""" + select + shared_blks_hit + shared_blks_read + from pg_stat_statements + where queryid = (select queryid + from pg_stat_statements + where userid = (select oid from pg_authid where rolname = current_role) + and query like '{sql_query}' + );""") + res = cur.fetchone() + if res is not None: + shared_buffers = res[0] + return shared_buffers def create_table(self, conn: psycopg.Connection, dimensions: int) -> None: with conn.cursor() as cur: @@ -276,8 +298,9 @@ def fit(self, X: numpy.array) -> None: if len(chunks) > 0: self.index_chunks(chunks) - def set_query_arguments(self, query_search_list_size): + def set_query_arguments(self, query_search_list_size, query_rescore): self._query_search_list_size = query_search_list_size + self._query_rescore = query_rescore #close and restart the pool to apply the new settings self._pool.close() self._pool = None @@ -297,6 +320,10 @@ def query(self, q: numpy.array, n: int) -> tuple[numpy.array, float]: def batch_query(self, X: numpy.array, n: int) -> None: threads = min(MAX_BATCH_QUERY_THREADS, X.size) + + with self._pool.connection() as conn: + shared_buffers_start = self.shared_buffers(conn) + results = numpy.empty((X.shape[0], n), dtype=int) latencies = numpy.empty(X.shape[0], dtype=float) with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: @@ -312,6 +339,14 @@ def batch_query(self, X: numpy.array, n: int) -> None: self.results = results self.latencies = latencies + with self._pool.connection() as conn: + shared_buffers_end = self.shared_buffers(conn) + + self._query_shared_buffers = shared_buffers_end - shared_buffers_start + + def get_additional(self): + return {"shared_buffers": self._query_shared_buffers} + def get_batch_results(self) -> numpy.array: return self.results @@ -319,4 +354,4 @@ def get_batch_latencies(self) -> numpy.array: return self.latencies def __str__(self): - return f"TSVector(num_neighbors={self._num_neighbors}, search_list_size={self._search_list_size}, max_alpha={self._max_alpha}, use_bq={self._use_bq}, pq_vector_length={self._pq_vector_length}, query_search_list_size={self._query_search_list_size})" + return f"TSVector(num_neighbors={self._num_neighbors}, search_list_size={self._search_list_size}, max_alpha={self._max_alpha}, use_bq={self._use_bq}, pq_vector_length={self._pq_vector_length}, query_search_list_size={self._query_search_list_size}, query_rescore={self._query_rescore})" diff --git a/ann_benchmarks/plotting/metrics.py b/ann_benchmarks/plotting/metrics.py index a086f7c8f..5dd0c57d3 100644 --- a/ann_benchmarks/plotting/metrics.py +++ b/ann_benchmarks/plotting/metrics.py @@ -27,7 +27,8 @@ def knn(dataset_distances, run_distances, count, metrics, epsilon=1e-3): if "knn" not in metrics: print("Computing knn metrics") knn_metrics = metrics.create_group("knn") - mean, std, recalls = get_recall_values(dataset_distances, run_distances, count, knn_threshold, epsilon) + mean, std, recalls = get_recall_values( + dataset_distances, run_distances, count, knn_threshold, epsilon) knn_metrics.attrs["mean"] = mean knn_metrics.attrs["std"] = std knn_metrics["recalls"] = recalls @@ -41,7 +42,8 @@ def epsilon(dataset_distances, run_distances, count, metrics, epsilon=0.01): if s not in metrics: print("Computing epsilon metrics") epsilon_metrics = metrics.create_group(s) - mean, std, recalls = get_recall_values(dataset_distances, run_distances, count, epsilon_threshold, epsilon) + mean, std, recalls = get_recall_values( + dataset_distances, run_distances, count, epsilon_threshold, epsilon) epsilon_metrics.attrs["mean"] = mean epsilon_metrics.attrs["std"] = std epsilon_metrics["recalls"] = recalls @@ -61,7 +63,8 @@ def rel(dataset_distances, run_distances, metrics): if total_closest_distance < 0.01: metrics.attrs["rel"] = float("inf") else: - metrics.attrs["rel"] = total_candidate_distance / total_closest_distance + metrics.attrs["rel"] = total_candidate_distance / \ + total_closest_distance else: print("Found cached result") return metrics.attrs["rel"] @@ -214,4 +217,11 @@ def dist_computations(queries, attrs): / queries_per_second(true_distances, run_attrs), # noqa "worst": float("inf"), }, + "shared_buffers": { + "description": "Shared Buffers", + "function": lambda true_distances, run_distances, metrics, times, run_attrs: run_attrs.get( + "shared_buffers", 0 + ), # noqa + "worst": float("inf"), + } } diff --git a/mat_export.py b/mat_export.py new file mode 100644 index 000000000..c2af406fe --- /dev/null +++ b/mat_export.py @@ -0,0 +1,31 @@ +import argparse +import csv + +from ann_benchmarks.datasets import DATASETS, get_dataset +from ann_benchmarks.plotting.utils import compute_metrics_all_runs +from ann_benchmarks.results import load_all_results + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--recompute", action="store_true", + help="Recompute metrics") + parser.add_argument("--batch", action="store_true", + help="Process batch mode results") + args = parser.parse_args() + + datasets = DATASETS.keys() + dfs = [] + for dataset_name in datasets: + print("Looking at dataset", dataset_name) + if len(list(load_all_results(dataset_name, batch_mode=args.batch))) > 0: + results = load_all_results(dataset_name, batch_mode=args.batch) + dataset, _ = get_dataset(dataset_name) + results = compute_metrics_all_runs( + dataset, results, args.recompute) + for res in results: + res["dataset"] = dataset_name + dfs.append(res) + if len(dfs) > 0: + for res in dfs: + print("%s %s %12.3f %12.3f %12.3f" % ( + res["algorithm"], res["parameters"], res["k-nn"], res["qps"], res["shared_buffers"]))