Skip to content

Commit

Permalink
Merge pull request erikbern#3 from timescale/mat/output
Browse files Browse the repository at this point in the history
Allow tracking shared_buffers and iterating query params
  • Loading branch information
jgpruitt authored Apr 23, 2024
2 parents d8be6fa + c80ad80 commit 8764530
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ann_benchmarks/algorithms/tsvector/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ float:
[1], # use_bq
[0], # pq_vector_length
]
query_args: [[75]] # query_search_list_size
query_args: [[75, 150, 200, 300, 400, 500, 600, 700, 800], [75, 150, 200, 300, 400]] # query_search_list_size
#tsvector2:
# args: [[50], [10], [1.2]]
# query_args: [[10, 50, 50]]
Expand Down
41 changes: 38 additions & 3 deletions ann_benchmarks/algorithms/tsvector/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#QUERY = """with x as materialized (select id, embedding <=> %s as distance from public.items order by 2 limit 100) select id from x order by distance limit %s"""

CONNECTION_SETTINGS = [
"set tsv.query_rescore = 25;",
"set work_mem = '2GB';",
"set maintenance_work_mem = '8GB';"
"set max_parallel_workers_per_gather = 0;",
Expand Down Expand Up @@ -49,6 +48,8 @@ def __init__(self, metric: str, connection_str: str, num_neighbors: int, search_
self._use_bq: bool = (use_bq == 1)
self._pq_vector_length: int = pq_vector_length
self._query_search_list_size: Optional[int] = None
self._query_rescore: Optional[int] = None
self._query_shared_buffers = 0;
self._pool : ConnectionPool = None
if metric == "angular":
self._query: str = QUERY
Expand All @@ -74,6 +75,9 @@ def configure(conn):
if self._query_search_list_size is not None:
conn.execute("set tsv.query_search_list_size = %d" % self._query_search_list_size)
print("set tsv.query_search_list_size = %d" % self._query_search_list_size)
if self._query_rescore is not None:
conn.execute("set tsv.query_rescore = %d" % self._query_rescore)
print("set tsv.query_rescore = %d" % self._query_rescore)
for setting in CONNECTION_SETTINGS:
conn.execute(setting)
conn.commit()
Expand All @@ -85,6 +89,24 @@ def does_table_exist(self, conn: psycopg.Connection) -> bool:
cur.execute("select count(*) from pg_class where relname = 'items'")
table_count = cur.fetchone()[0]
return table_count > 0

def shared_buffers(self, conn: psycopg.Connection) -> bool:
shared_buffers = 0
with conn.cursor() as cur:
sql_query = QUERY % ("$1", "%2")
cur.execute(f"""
select
shared_blks_hit + shared_blks_read
from pg_stat_statements
where queryid = (select queryid
from pg_stat_statements
where userid = (select oid from pg_authid where rolname = current_role)
and query like '{sql_query}'
);""")
res = cur.fetchone()
if res is not None:
shared_buffers = res[0]
return shared_buffers

def create_table(self, conn: psycopg.Connection, dimensions: int) -> None:
with conn.cursor() as cur:
Expand Down Expand Up @@ -276,8 +298,9 @@ def fit(self, X: numpy.array) -> None:
if len(chunks) > 0:
self.index_chunks(chunks)

def set_query_arguments(self, query_search_list_size):
def set_query_arguments(self, query_search_list_size, query_rescore):
self._query_search_list_size = query_search_list_size
self._query_rescore = query_rescore
#close and restart the pool to apply the new settings
self._pool.close()
self._pool = None
Expand All @@ -297,6 +320,10 @@ def query(self, q: numpy.array, n: int) -> tuple[numpy.array, float]:

def batch_query(self, X: numpy.array, n: int) -> None:
threads = min(MAX_BATCH_QUERY_THREADS, X.size)

with self._pool.connection() as conn:
shared_buffers_start = self.shared_buffers(conn)

results = numpy.empty((X.shape[0], n), dtype=int)
latencies = numpy.empty(X.shape[0], dtype=float)
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
Expand All @@ -312,11 +339,19 @@ def batch_query(self, X: numpy.array, n: int) -> None:
self.results = results
self.latencies = latencies

with self._pool.connection() as conn:
shared_buffers_end = self.shared_buffers(conn)

self._query_shared_buffers = shared_buffers_end - shared_buffers_start

def get_additional(self):
return {"shared_buffers": self._query_shared_buffers}

def get_batch_results(self) -> numpy.array:
return self.results

def get_batch_latencies(self) -> numpy.array:
return self.latencies

def __str__(self):
return f"TSVector(num_neighbors={self._num_neighbors}, search_list_size={self._search_list_size}, max_alpha={self._max_alpha}, use_bq={self._use_bq}, pq_vector_length={self._pq_vector_length}, query_search_list_size={self._query_search_list_size})"
return f"TSVector(num_neighbors={self._num_neighbors}, search_list_size={self._search_list_size}, max_alpha={self._max_alpha}, use_bq={self._use_bq}, pq_vector_length={self._pq_vector_length}, query_search_list_size={self._query_search_list_size}, query_rescore={self._query_rescore})"
16 changes: 13 additions & 3 deletions ann_benchmarks/plotting/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def knn(dataset_distances, run_distances, count, metrics, epsilon=1e-3):
if "knn" not in metrics:
print("Computing knn metrics")
knn_metrics = metrics.create_group("knn")
mean, std, recalls = get_recall_values(dataset_distances, run_distances, count, knn_threshold, epsilon)
mean, std, recalls = get_recall_values(
dataset_distances, run_distances, count, knn_threshold, epsilon)
knn_metrics.attrs["mean"] = mean
knn_metrics.attrs["std"] = std
knn_metrics["recalls"] = recalls
Expand All @@ -41,7 +42,8 @@ def epsilon(dataset_distances, run_distances, count, metrics, epsilon=0.01):
if s not in metrics:
print("Computing epsilon metrics")
epsilon_metrics = metrics.create_group(s)
mean, std, recalls = get_recall_values(dataset_distances, run_distances, count, epsilon_threshold, epsilon)
mean, std, recalls = get_recall_values(
dataset_distances, run_distances, count, epsilon_threshold, epsilon)
epsilon_metrics.attrs["mean"] = mean
epsilon_metrics.attrs["std"] = std
epsilon_metrics["recalls"] = recalls
Expand All @@ -61,7 +63,8 @@ def rel(dataset_distances, run_distances, metrics):
if total_closest_distance < 0.01:
metrics.attrs["rel"] = float("inf")
else:
metrics.attrs["rel"] = total_candidate_distance / total_closest_distance
metrics.attrs["rel"] = total_candidate_distance / \
total_closest_distance
else:
print("Found cached result")
return metrics.attrs["rel"]
Expand Down Expand Up @@ -214,4 +217,11 @@ def dist_computations(queries, attrs):
/ queries_per_second(true_distances, run_attrs), # noqa
"worst": float("inf"),
},
"shared_buffers": {
"description": "Shared Buffers",
"function": lambda true_distances, run_distances, metrics, times, run_attrs: run_attrs.get(
"shared_buffers", 0
), # noqa
"worst": float("inf"),
}
}
31 changes: 31 additions & 0 deletions mat_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import argparse
import csv

from ann_benchmarks.datasets import DATASETS, get_dataset
from ann_benchmarks.plotting.utils import compute_metrics_all_runs
from ann_benchmarks.results import load_all_results

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--recompute", action="store_true",
help="Recompute metrics")
parser.add_argument("--batch", action="store_true",
help="Process batch mode results")
args = parser.parse_args()

datasets = DATASETS.keys()
dfs = []
for dataset_name in datasets:
print("Looking at dataset", dataset_name)
if len(list(load_all_results(dataset_name, batch_mode=args.batch))) > 0:
results = load_all_results(dataset_name, batch_mode=args.batch)
dataset, _ = get_dataset(dataset_name)
results = compute_metrics_all_runs(
dataset, results, args.recompute)
for res in results:
res["dataset"] = dataset_name
dfs.append(res)
if len(dfs) > 0:
for res in dfs:
print("%s %s %12.3f %12.3f %12.3f" % (
res["algorithm"], res["parameters"], res["k-nn"], res["qps"], res["shared_buffers"]))

0 comments on commit 8764530

Please sign in to comment.