Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zilliz streaming solution #268

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/neurips23.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ jobs:
- algorithm: pinecone
dataset: random-filter-s
track: filter
- algorithm: zilliz
dataset: random-xs
track: streaming
fail-fast: false

steps:
Expand Down
11 changes: 11 additions & 0 deletions neurips23/streaming/zilliz/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM neurips23

RUN apt update
RUN apt install -y software-properties-common
RUN add-apt-repository -y ppa:git-core/ppa
RUN apt update
RUN DEBIAN_FRONTEND=noninteractive apt install -y git make cmake g++ libaio-dev libgoogle-perftools-dev libunwind-dev clang-format libboost-dev libboost-program-options-dev libmkl-full-dev libcpprest-dev python3.10

RUN git clone https://github.com/hhy3/zilliz-bigann.git --branch streaming
RUN pip install ./zilliz-bigann/*.whl

60 changes: 60 additions & 0 deletions neurips23/streaming/zilliz/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
random-xs-clustered:
zilliz:
docker-tag: neurips23-streaming-zilliz
module: neurips23.streaming.zilliz.zilliz
constructor: Zilliz
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":100, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":200, "T":8}]
random-xs:
zilliz:
docker-tag: neurips23-streaming-zilliz
module: neurips23.streaming.zilliz.zilliz
constructor: Zilliz
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":50, "T":8}]
msturing-10M-clustered:
zilliz:
docker-tag: neurips23-streaming-zilliz
module: neurips23.streaming.zilliz.zilliz
constructor: Zilliz
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":16, "L":10, "insert_threads":8, "consolidate_threads":8}]
query-args: |
[
{"Ls":100, "T":8}
]
msturing-30M-clustered:
zilliz:
docker-tag: neurips23-streaming-zilliz
module: neurips23.streaming.zilliz.zilliz
constructor: Zilliz
base-args: ["@metric"]
run-groups:
base:
args: |
[
{"R":32, "L":110, "insert_threads":8, "consolidate_threads":8}
]
query-args: |
[
{"Ls":400, "T":8},
{"Ls":450, "T":8},
{"Ls":500, "T":8},
{"Ls":550, "T":8}
]
115 changes: 115 additions & 0 deletions neurips23/streaming/zilliz/zilliz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import absolute_import
import psutil
import os
import time
import numpy as np

import diskannpy
import fast_refine

from neurips23.streaming.base import BaseStreamingANN

class Zilliz(BaseStreamingANN):
def __init__(self, metric, index_params):
self.name = "pyanns"
if (index_params.get("R")==None):
print("Error: missing parameter R")
return
if (index_params.get("L")==None):
print("Error: missing parameter L")
return
self._index_params = index_params
self._metric = metric

self.R = index_params.get("R")
self.L = index_params.get("L")
self.insert_threads = index_params.get("insert_threads")
self.consolidate_threads = index_params.get("consolidate_threads")
self.mx = None
self.mi = None

def index_name(self):
return f"R{self.R}_L{self.L}"

def create_index_dir(self, dataset):
index_dir = os.path.join(os.getcwd(), "data", "indices", "streaming")
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, 'pyanns')
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, dataset.short_name())
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, self.index_name())
os.makedirs(index_dir, mode=0o777, exist_ok=True)
return index_dir

def translate_dist_fn(self, metric):
if metric == 'euclidean':
return 'l2'
elif metric == 'ip':
return 'mips'
else:
raise Exception('Invalid metric')

def translate_dtype(self, dtype:str):
return np.uint8

def setup(self, dtype, max_pts, ndim):
self.index = diskannpy.DynamicMemoryIndex(
distance_metric = self.translate_dist_fn(self._metric),
vector_dtype = self.translate_dtype(dtype),
max_vectors = max_pts,
dimensions = ndim,
graph_degree = self.R,
complexity=self.L,
num_threads = self.insert_threads, #to allocate scratch space for up to 64 search threads
initial_search_complexity = 100
)
self.refiner = fast_refine.Refiner(ndim, max_pts)
self.max_pts = max_pts
print('Index class constructed and ready for update/search')
self.active_indices = set()
self.num_unprocessed_deletes = 0

def quant(self, X, mi, mx):
return np.round(np.clip((X - mi) / (mx - mi) * 127.0, 0.0, 127.0)).astype('uint8')

def insert(self, X, ids):
if self.mi is None:
self.mi = X.min()
self.mx = X.max()

self.refiner.batch_insert(X, ids)
X = self.quant(X, self.mi, self.mx)
self.active_indices.update(ids+1)
print('#active pts', len(self.active_indices), '#unprocessed deletes', self.num_unprocessed_deletes)
if len(self.active_indices) + self.num_unprocessed_deletes > self.max_pts:
self.index.consolidate_delete()
self.num_unprocessed_deletes = 0

self.index.batch_insert(X, ids+1)

def delete(self, ids):
self.refiner.batch_delete(ids)
for id in ids:
self.index.mark_deleted(id+1)
self.active_indices.difference_update(ids+1)
self.num_unprocessed_deletes += len(ids)

def query(self, X, k):
"""Carry out a batch query for k-NN of query set X."""
nq, d = X.shape
Xq = self.quant(X, self.mi, self.mx)
k_mul = 5
k_reorder = k * k_mul
I, _ = self.index.batch_search(
Xq, k_reorder, self.Ls, self.search_threads)
I = I - 1
self.res = self.refiner.batch_refine(X, I, k).reshape(nq, k)

def set_query_arguments(self, query_args):
self._query_args = query_args
self.Ls = 0 if query_args.get("Ls") == None else query_args.get("Ls")
self.search_threads = self._query_args.get("T")

def __str__(self):
return f'zilliz({self.index_name(), self._query_args})'
Loading