Skip to content

Commit

Permalink
Merge pull request erikbern#6 from timescale/mat/change3
Browse files Browse the repository at this point in the history
Mat/change3
  • Loading branch information
jgpruitt authored May 1, 2024
2 parents 8916d49 + 2401aef commit 678f2b3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
1 change: 1 addition & 0 deletions ann_benchmarks/algorithms/tsvector/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ float:
[1.2], # max_alpha
[1], # use_bq
[0], # pq_vector_length
[0], # num_bits_per_dimension- 0 is for default
]
query_args: [[75, 150, 200, 300, 400, 500, 600, 700, 800], [75, 150, 200, 300, 400]] # query_search_list_size
#tsvector2:
Expand Down
43 changes: 39 additions & 4 deletions ann_benchmarks/algorithms/tsvector/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,22 @@
CHUNK_TIME_STEP = timedelta(days=1)
CHUNK_TIME_INTERVAL = "'1d'::interval"
STORAGE_LAYOUT = "memory_optimized"
PREWARM = True

assert (EMBEDDINGS_PER_COPY_BATCH <= EMBEDDINGS_PER_CHUNK)


class TSVector(BaseANN):
def __init__(self, metric: str, connection_str: str, num_neighbors: int, search_list_size: int,
max_alpha: float, use_bq: int, pq_vector_length: int):
max_alpha: float, use_bq: int, pq_vector_length: int, num_bits_per_dimension: int):
self._metric: str = metric
self._connection_str: str = connection_str
self._num_neighbors: int = num_neighbors
self._search_list_size: int = search_list_size
self._max_alpha: float = max_alpha
self._use_bq: bool = (use_bq == 1)
self._pq_vector_length: int = pq_vector_length
self._num_bits_per_dimension: int = num_bits_per_dimension
self._query_search_list_size: Optional[int] = None
self._query_rescore: Optional[int] = None
self._query_shared_buffers_hit = 0
Expand Down Expand Up @@ -240,7 +242,8 @@ def index_table(self) -> None:
with conn.cursor() as cur:
if self._use_bq:
cur.execute(f"""create index on only public.items using tsv (embedding)
with (num_neighbors = {self._num_neighbors}, search_list_size = {self._search_list_size}, max_alpha={self._max_alpha}, storage_layout='{STORAGE_LAYOUT}')"""
with (num_neighbors = {self._num_neighbors}, search_list_size = {self._search_list_size}, max_alpha={self._max_alpha},
num_bits_per_dimension={self._num_bits_per_dimension}, storage_layout='{STORAGE_LAYOUT}')"""
)
elif self._pq_vector_length < 1:
cur.execute(f"""create index on only public.items using tsv (embedding)
Expand All @@ -259,7 +262,8 @@ def index_chunk(self, chunk: str) -> Optional[Exception]:
with conn.cursor() as cur:
if self._use_bq:
cur.execute(f"""create index on only {chunk} using tsv (embedding)
with (num_neighbors = {self._num_neighbors}, search_list_size = {self._search_list_size}, max_alpha={self._max_alpha}, storage_layout='{STORAGE_LAYOUT}')"""
with (num_neighbors = {self._num_neighbors}, search_list_size = {self._search_list_size}, max_alpha={self._max_alpha},
num_bits_per_dimension={self._num_bits_per_dimension}, storage_layout='{STORAGE_LAYOUT}')"""
)
elif self._pq_vector_length < 1:
cur.execute(f"""create index on only {chunk} using tsv (embedding)
Expand Down Expand Up @@ -303,6 +307,34 @@ def index_chunks(self, chunks: list[str]) -> None:
with self._pool.connection() as conn:
self.log_stop(conn, id)

def prewarm_heap(self, conn: psycopg.Connection) -> None:
if PREWARM:
with conn.cursor() as cur:
cur.execute(
"select format($$%I.%I$$, chunk_schema, chunk_name) from timescaledb_information.chunks k where hypertable_name = 'items'")
chunks = [row[0] for row in cur]
for chunk in chunks:
print(f"prewarming chunk heap {chunk}")
cur.execute(
f"select pg_prewarm('{chunk}'::regclass, mode=>'buffer')")
cur.fetchall()

def prewarm_index(self, conn: psycopg.Connection) -> None:
if PREWARM:
with conn.cursor() as cur:
cur.execute("""
select format($$%I.%I$$, x.schemaname, x.indexname)
from timescaledb_information.chunks k
inner join pg_catalog.pg_indexes x on (k.chunk_schema = x.schemaname and k.chunk_name = x.tablename)
where x.indexname ilike '%_embedding_%'
and k.hypertable_name = 'items'""")
chunks = [row[0] for row in cur]
for chunk_index in chunks:
print(f"prewarming chunk index {chunk_index}")
cur.execute(
f"select pg_prewarm('{chunk_index}'::regclass, mode=>'buffer')")
cur.fetchall()

def fit(self, X: numpy.array) -> None:
# have to create the extensions before starting the connection pool
with psycopg.connect(self._connection_str) as conn:
Expand Down Expand Up @@ -332,6 +364,9 @@ def set_query_arguments(self, query_search_list_size, query_rescore):
self._pool.close()
self._pool = None
self.start_pool()
with self._pool.connection() as conn:
self.prewarm_heap(conn)
self.prewarm_index(conn)

def get_memory_usage(self) -> Optional[float]:
return psutil.Process().memory_info().rss / 1024
Expand Down Expand Up @@ -388,4 +423,4 @@ def get_batch_latencies(self) -> numpy.array:
return self.latencies

def __str__(self):
return f"algorithm=TSVector num_neighbors={self._num_neighbors} search_list_size={self._search_list_size} max_alpha={self._max_alpha} use_bq={self._use_bq} pq_vector_length={self._pq_vector_length} query_search_list_size={self._query_search_list_size} query_rescore={self._query_rescore}"
return f"algorithm=TSVector num_neighbors={self._num_neighbors} search_list_size={self._search_list_size} max_alpha={self._max_alpha} num_bits_per_dimension={self._num_bits_per_dimension} use_bq={self._use_bq} pq_vector_length={self._pq_vector_length} query_search_list_size={self._query_search_list_size} query_rescore={self._query_rescore}"

0 comments on commit 678f2b3

Please sign in to comment.