Skip to content

Commit

Permalink
Support float16/bfloat16/sparse vector for bulkwriter
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo committed Jun 14, 2024
1 parent f8398ee commit 7ec842c
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 84 deletions.
148 changes: 103 additions & 45 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import pandas as pd
import numpy as np
import tensorflow as tf

import logging
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -51,13 +52,51 @@
ALL_TYPES_COLLECTION_NAME = "all_types_for_bulkwriter"
DIM = 512

def gen_binary_vector():
# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
binary_vectors = np.packbits(raw_vector, axis=-1).tolist()
return binary_vectors

def gen_float_vector():
return [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# optional input for bfloat16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of bfloat16
def gen_bf16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
return raw_vector

# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# optional input for sparse vector:
# only accepts dict of integer key and float value such as {1: 2.32, 43: 4.56, 65: 35.43}
# note: no need to sort the keys
def gen_sparse_vector():
raw_vector = {}
dim = random.randint(2, 20)
while len(raw_vector) < dim:
raw_vector[random.randint(0, 10000)] = random.random()
return raw_vector

def create_connection():
print(f"\nCreate connection...")
Expand All @@ -81,7 +120,7 @@ def build_simple_collection():
print(f"Collection '{collection.name}' created")
return collection.schema

def build_all_type_schema(bin_vec: bool, has_array: bool):
def build_all_type_schema(is_numpy: bool):
print(f"\n===================== build all types schema ====================")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
Expand All @@ -94,12 +133,18 @@ def build_all_type_schema(bin_vec: bool, has_array: bool):
FieldSchema(name="double", dtype=DataType.DOUBLE),
FieldSchema(name="varchar", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="json", dtype=DataType.JSON),
FieldSchema(name="vector", dtype=DataType.BINARY_VECTOR, dim=DIM) if bin_vec else FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
# from 2.4.0, milvus supports multiple vector fields in one collection
# FieldSchema(name="float_vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
FieldSchema(name="binary_vector", dtype=DataType.BINARY_VECTOR, dim=DIM),
FieldSchema(name="float16_vector", dtype=DataType.FLOAT16_VECTOR, dim=DIM),
FieldSchema(name="bfloat16_vector", dtype=DataType.BFLOAT16_VECTOR, dim=DIM),
]

if has_array:
# milvus doesn't support parsing array/sparse_vector from numpy file
if not is_numpy:
fields.append(FieldSchema(name="array_str", dtype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128))
fields.append(FieldSchema(name="array_int", dtype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64))
fields.append(FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR))

schema = CollectionSchema(fields=fields, enable_dynamic_field=True)
return schema
Expand All @@ -118,7 +163,7 @@ def read_sample_data(file_path: str, writer: [LocalBulkWriter, RemoteBulkWriter]

writer.append_row(row)

def local_writer(schema: CollectionSchema, file_type: BulkFileType):
def local_writer_simple(schema: CollectionSchema, file_type: BulkFileType):
print(f"\n===================== local writer ({file_type.name}) ====================")
with LocalBulkWriter(
schema=schema,
Expand All @@ -131,7 +176,7 @@ def local_writer(schema: CollectionSchema, file_type: BulkFileType):

# append rows
for i in range(100000):
local_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
local_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(i%2==0), "label": f"label_{i}"})

print(f"{local_writer.total_row_count} rows appends")
print(f"{local_writer.buffer_row_count} rows in buffer not flushed")
Expand All @@ -141,7 +186,7 @@ def local_writer(schema: CollectionSchema, file_type: BulkFileType):
print(f"Local writer done! output local files: {batch_files}")


def remote_writer(schema: CollectionSchema, file_type: BulkFileType):
def remote_writer_simple(schema: CollectionSchema, file_type: BulkFileType):
print(f"\n===================== remote writer ({file_type.name}) ====================")
with RemoteBulkWriter(
schema=schema,
Expand All @@ -160,7 +205,7 @@ def remote_writer(schema: CollectionSchema, file_type: BulkFileType):

# append rows
for i in range(10000):
remote_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
remote_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(i%2==0), "label": f"label_{i}"})

print(f"{remote_writer.total_row_count} rows appends")
print(f"{remote_writer.buffer_row_count} rows in buffer not flushed")
Expand All @@ -174,7 +219,7 @@ def parallel_append(schema: CollectionSchema):
def _append_row(writer: LocalBulkWriter, begin: int, end: int):
try:
for i in range(begin, end):
writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(False), "label": f"label_{i}"})
if i%100 == 0:
print(f"{threading.current_thread().name} inserted {i-begin} items")
except Exception as e:
Expand Down Expand Up @@ -224,8 +269,8 @@ def _append_row(writer: LocalBulkWriter, begin: int, end: int):
print("Data is correct")


def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFileType)->list:
print(f"\n===================== all field types ({file_type.name}) binary_vector={bin_vec} ====================")
def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list:
print(f"\n===================== all field types ({file_type.name}) ====================")
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
Expand All @@ -251,11 +296,16 @@ def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFil
"double": i/7,
"varchar": f"varchar_{i}",
"json": {"dummy": i, "ok": f"name_{i}"},
"vector": gen_binary_vector() if bin_vec else gen_float_vector(),
# "float_vector": gen_float_vector(False),
"binary_vector": gen_binary_vector(False),
"float16_vector": gen_fp16_vector(False),
"bfloat16_vector": gen_bf16_vector(False),
f"dynamic_{i}": i,
# bulkinsert doesn't support import npy with array field, the below values will be stored into dynamic field
# bulkinsert doesn't support import npy with array field and sparse vector,
# if file_type is numpy, the below values will be stored into dynamic field
"array_str": [f"str_{k}" for k in range(5)],
"array_int": [k for k in range(10)],
"sparse_vector": gen_sparse_vector(),
}
remote_writer.append_row(row)

Expand All @@ -272,11 +322,16 @@ def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFil
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"vector": np.array(gen_binary_vector()).astype(np.dtype("int8")) if bin_vec else np.array(gen_float_vector()).astype(np.dtype("float32")),
# "float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
"bfloat16_vector": gen_bf16_vector(True),
f"dynamic_{i}": i,
# bulkinsert doesn't support import npy with array field, the below values will be stored into dynamic field
# bulkinsert doesn't support import npy with array field and sparse vector,
# if file_type is numpy, the below values will be stored into dynamic field
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"sparse_vector": gen_sparse_vector(),
})

print(f"{remote_writer.total_row_count} rows appends")
Expand Down Expand Up @@ -316,19 +371,29 @@ def call_bulkinsert(schema: CollectionSchema, batch_files: list):
print(f"Collection row number: {collection.num_entities}")


def retrieve_imported_data(bin_vec: bool):
def retrieve_imported_data():
collection = Collection(name=ALL_TYPES_COLLECTION_NAME)
print("Create index...")
index_param = {
"index_type": "BIN_FLAT",
"params": {},
"metric_type": "HAMMING"
} if bin_vec else {
"index_type": "FLAT",
"params": {},
"metric_type": "L2"
}
collection.create_index(field_name="vector", index_params=index_param)
for field in collection.schema.fields:
if (field.dtype == DataType.FLOAT_VECTOR or field.dtype == DataType.FLOAT16_VECTOR
or field.dtype == DataType.BFLOAT16_VECTOR):
collection.create_index(field_name=field.name, index_params={
"index_type": "FLAT",
"params": {},
"metric_type": "L2"
})
elif field.dtype == DataType.BINARY_VECTOR:
collection.create_index(field_name=field.name, index_params={
"index_type": "BIN_FLAT",
"params": {},
"metric_type": "HAMMING"
})
elif field.dtype == DataType.SPARSE_FLOAT_VECTOR:
collection.create_index(field_name=field.name, index_params={
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP",
"params": {"drop_ratio_build": 0.2}
})

ids = [100, 5000]
print(f"Load collection and query items {ids}")
Expand Down Expand Up @@ -393,28 +458,21 @@ def cloud_bulkinsert():

schema = build_simple_collection()
for file_type in file_types:
local_writer(schema=schema, file_type=file_type)
local_writer_simple(schema=schema, file_type=file_type)

for file_type in file_types:
remote_writer(schema=schema, file_type=file_type)
remote_writer_simple(schema=schema, file_type=file_type)

parallel_append(schema)

# float vectors + all scalar types
# all vector types + all scalar types
for file_type in file_types:
# Note: bulkinsert doesn't support import npy with array field
schema = build_all_type_schema(bin_vec=False, has_array=False if file_type==BulkFileType.NUMPY else True)
batch_files = all_types_writer(bin_vec=False, schema=schema, file_type=file_type)
# Note: bulkinsert doesn't support import npy with array field and sparse vector field
schema = build_all_type_schema(is_numpy= file_type==BulkFileType.NUMPY)
batch_files = all_types_writer(schema=schema, file_type=file_type)
call_bulkinsert(schema, batch_files)
retrieve_imported_data(bin_vec=False)
retrieve_imported_data()

# binary vectors + all scalar types
for file_type in file_types:
# Note: bulkinsert doesn't support import npy with array field
schema = build_all_type_schema(bin_vec=True, has_array=False if file_type == BulkFileType.NUMPY else True)
batch_files = all_types_writer(bin_vec=True, schema=schema, file_type=file_type)
call_bulkinsert(schema, batch_files)
retrieve_imported_data(bin_vec=True)

# # to call cloud bulkinsert api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
# cloud_bulkinsert()
Expand Down
46 changes: 38 additions & 8 deletions pymilvus/bulk_writer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,22 @@ def _persist_npy(self, local_path: str, **kwargs):
str_arr.append(json.dumps(val))
self._buffer[k] = str_arr

arr = np.array(self._buffer[k], dtype=dt)
# currently, milvus server doesn't support numpy for sparse vector
if field_schema.dtype == DataType.SPARSE_FLOAT_VECTOR:
self._throw(
f"Failed to persist file {full_file_name},"
f" error: milvus doesn't support parsing sparse vectors from numpy file"
)

# special process for float16 vector, the self._buffer stores bytes for
# float16 vector, convert the bytes to uint8 array
if field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
a = []
for b in self._buffer[k]:
a.append(np.frombuffer(b, dtype=dt).tolist())
arr = np.array(a, dtype=dt)
else:
arr = np.array(self._buffer[k], dtype=dt)
np.save(str(full_file_name), arr)
except Exception as e:
self._throw(f"Failed to persist file {full_file_name}, error: {e}")
Expand All @@ -173,7 +188,18 @@ def _persist_json_rows(self, local_path: str, **kwargs):
while row_index < row_count:
row = {}
for k, v in self._buffer.items():
row[k] = v[row_index]
# special process for float16 vector, the self._buffer stores bytes for
# float16 vector, convert the bytes to float list
field_schema = self._fields[k]
if field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
dt = (
np.dtype("bfloat16")
if (field_schema.dtype == DataType.BFLOAT16_VECTOR)
else np.float16
)
row[k] = np.frombuffer(v[row_index], dtype=dt).tolist()
else:
row[k] = v[row_index]
rows.append(row)
row_index = row_index + 1

Expand All @@ -196,21 +222,25 @@ def _persist_parquet(self, local_path: str, **kwargs):
data = {}
for k in self._buffer:
field_schema = self._fields[k]
if field_schema.dtype == DataType.JSON:
# for JSON field, store as string array
if field_schema.dtype in {DataType.JSON, DataType.SPARSE_FLOAT_VECTOR}:
# for JSON and SPARSE_VECTOR field, store as string array
str_arr = []
for val in self._buffer[k]:
str_arr.append(json.dumps(val))
data[k] = pd.Series(str_arr, dtype=None)
elif field_schema.dtype == DataType.FLOAT_VECTOR:
elif field_schema.dtype in {DataType.BINARY_VECTOR, DataType.FLOAT_VECTOR}:
arr = []
for val in self._buffer[k]:
arr.append(np.array(val, dtype=np.dtype("float32")))
arr.append(np.array(val, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name]))
data[k] = pd.Series(arr)
elif field_schema.dtype == DataType.BINARY_VECTOR:
elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
# special process for float16 vector, the self._buffer stores bytes for
# float16 vector, convert the bytes to uint8 array
arr = []
for val in self._buffer[k]:
arr.append(np.array(val, dtype=np.dtype("uint8")))
arr.append(
np.frombuffer(val, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name])
)
data[k] = pd.Series(arr)
elif field_schema.dtype == DataType.ARRAY:
dt = NUMPY_TYPE_CREATOR[field_schema.element_type.name]
Expand Down
40 changes: 27 additions & 13 deletions pymilvus/bulk_writer/bulk_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,23 @@ def _throw(self, msg: str):
def _verify_vector(self, x: object, field: FieldSchema):
dtype = DataType(field.dtype)
validator = TYPE_VALIDATOR[dtype.name]
dim = field.params["dim"]
if not validator(x, dim):
self._throw(
f"Illegal vector data for vector field: '{field.name}',"
f" dim is not {dim} or type mismatch"
)

return len(x) * 4 if dtype == DataType.FLOAT_VECTOR else len(x)
if dtype != DataType.SPARSE_FLOAT_VECTOR:
dim = field.params["dim"]
try:
origin_list = validator(x, dim)
if dtype == DataType.FLOAT_VECTOR:
return origin_list, dim * 4 # for float vector, each dim occupies 4 bytes
if dtype == DataType.BINARY_VECTOR:
return origin_list, dim / 8 # for binary vector, 8 dim occupies 1 byte
return origin_list, dim * 2 # for float16 vector, each dim occupies 2 bytes
except MilvusException as e:
self._throw(f"Illegal vector data for vector field: '{field.name}': {e.message}")
else:
try:
validator(x)
return x, len(x) * 12 # for sparse vector, each key-value is int-float, 12 bytes
except MilvusException as e:
self._throw(f"Illegal vector data for vector field: '{field.name}': {e.message}")

def _verify_json(self, x: object, field: FieldSchema):
size = 0
Expand Down Expand Up @@ -187,11 +196,16 @@ def _verify_row(self, row: dict):
self._throw(f"The field '{field.name}' is missed in the row")

dtype = DataType(field.dtype)
if dtype in {DataType.BINARY_VECTOR, DataType.FLOAT_VECTOR}:
if isinstance(row[field.name], np.ndarray):
row[field.name] = row[field.name].tolist()

row_size = row_size + self._verify_vector(row[field.name], field)
if dtype in {
DataType.BINARY_VECTOR,
DataType.FLOAT_VECTOR,
DataType.FLOAT16_VECTOR,
DataType.BFLOAT16_VECTOR,
DataType.SPARSE_FLOAT_VECTOR,
}:
origin_list, byte_len = self._verify_vector(row[field.name], field)
row[field.name] = origin_list
row_size = row_size + byte_len
elif dtype == DataType.VARCHAR:
row_size = row_size + self._verify_varchar(row[field.name], field)
elif dtype == DataType.JSON:
Expand Down
Loading

0 comments on commit 7ec842c

Please sign in to comment.