Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support float16/bfloat16/sparse vector for bulkwriter #2128

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 103 additions & 45 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import pandas as pd
import numpy as np
import tensorflow as tf

import logging
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -51,13 +52,51 @@
ALL_TYPES_COLLECTION_NAME = "all_types_for_bulkwriter"
DIM = 512

def gen_binary_vector():
# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
binary_vectors = np.packbits(raw_vector, axis=-1).tolist()
return binary_vectors

def gen_float_vector():
return [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# optional input for bfloat16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of bfloat16
def gen_bf16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
return raw_vector

# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# optional input for sparse vector:
# only accepts dict of integer key and float value such as {1: 2.32, 43: 4.56, 65: 35.43}
# note: no need to sort the keys
def gen_sparse_vector():
raw_vector = {}
dim = random.randint(2, 20)
while len(raw_vector) < dim:
raw_vector[random.randint(0, 10000)] = random.random()
return raw_vector

def create_connection():
print(f"\nCreate connection...")
Expand All @@ -81,7 +120,7 @@ def build_simple_collection():
print(f"Collection '{collection.name}' created")
return collection.schema

def build_all_type_schema(bin_vec: bool, has_array: bool):
def build_all_type_schema(is_numpy: bool):
print(f"\n===================== build all types schema ====================")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
Expand All @@ -94,12 +133,18 @@ def build_all_type_schema(bin_vec: bool, has_array: bool):
FieldSchema(name="double", dtype=DataType.DOUBLE),
FieldSchema(name="varchar", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="json", dtype=DataType.JSON),
FieldSchema(name="vector", dtype=DataType.BINARY_VECTOR, dim=DIM) if bin_vec else FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
# from 2.4.0, milvus supports multiple vector fields in one collection
# FieldSchema(name="float_vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
FieldSchema(name="binary_vector", dtype=DataType.BINARY_VECTOR, dim=DIM),
FieldSchema(name="float16_vector", dtype=DataType.FLOAT16_VECTOR, dim=DIM),
FieldSchema(name="bfloat16_vector", dtype=DataType.BFLOAT16_VECTOR, dim=DIM),
]

if has_array:
# milvus doesn't support parsing array/sparse_vector from numpy file
if not is_numpy:
fields.append(FieldSchema(name="array_str", dtype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128))
fields.append(FieldSchema(name="array_int", dtype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64))
fields.append(FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR))

schema = CollectionSchema(fields=fields, enable_dynamic_field=True)
return schema
Expand All @@ -118,7 +163,7 @@ def read_sample_data(file_path: str, writer: [LocalBulkWriter, RemoteBulkWriter]

writer.append_row(row)

def local_writer(schema: CollectionSchema, file_type: BulkFileType):
def local_writer_simple(schema: CollectionSchema, file_type: BulkFileType):
print(f"\n===================== local writer ({file_type.name}) ====================")
with LocalBulkWriter(
schema=schema,
Expand All @@ -131,7 +176,7 @@ def local_writer(schema: CollectionSchema, file_type: BulkFileType):

# append rows
for i in range(100000):
local_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
local_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(i%2==0), "label": f"label_{i}"})

print(f"{local_writer.total_row_count} rows appends")
print(f"{local_writer.buffer_row_count} rows in buffer not flushed")
Expand All @@ -141,7 +186,7 @@ def local_writer(schema: CollectionSchema, file_type: BulkFileType):
print(f"Local writer done! output local files: {batch_files}")


def remote_writer(schema: CollectionSchema, file_type: BulkFileType):
def remote_writer_simple(schema: CollectionSchema, file_type: BulkFileType):
print(f"\n===================== remote writer ({file_type.name}) ====================")
with RemoteBulkWriter(
schema=schema,
Expand All @@ -160,7 +205,7 @@ def remote_writer(schema: CollectionSchema, file_type: BulkFileType):

# append rows
for i in range(10000):
remote_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
remote_writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(i%2==0), "label": f"label_{i}"})

print(f"{remote_writer.total_row_count} rows appends")
print(f"{remote_writer.buffer_row_count} rows in buffer not flushed")
Expand All @@ -174,7 +219,7 @@ def parallel_append(schema: CollectionSchema):
def _append_row(writer: LocalBulkWriter, begin: int, end: int):
try:
for i in range(begin, end):
writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(False), "label": f"label_{i}"})
if i%100 == 0:
print(f"{threading.current_thread().name} inserted {i-begin} items")
except Exception as e:
Expand Down Expand Up @@ -224,8 +269,8 @@ def _append_row(writer: LocalBulkWriter, begin: int, end: int):
print("Data is correct")


def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFileType)->list:
print(f"\n===================== all field types ({file_type.name}) binary_vector={bin_vec} ====================")
def all_types_writer(schema: CollectionSchema, file_type: BulkFileType)->list:
print(f"\n===================== all field types ({file_type.name}) ====================")
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
Expand All @@ -251,11 +296,16 @@ def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFil
"double": i/7,
"varchar": f"varchar_{i}",
"json": {"dummy": i, "ok": f"name_{i}"},
"vector": gen_binary_vector() if bin_vec else gen_float_vector(),
# "float_vector": gen_float_vector(False),
"binary_vector": gen_binary_vector(False),
"float16_vector": gen_fp16_vector(False),
"bfloat16_vector": gen_bf16_vector(False),
f"dynamic_{i}": i,
# bulkinsert doesn't support import npy with array field, the below values will be stored into dynamic field
# bulkinsert doesn't support import npy with array field and sparse vector,
# if file_type is numpy, the below values will be stored into dynamic field
"array_str": [f"str_{k}" for k in range(5)],
"array_int": [k for k in range(10)],
"sparse_vector": gen_sparse_vector(),
}
remote_writer.append_row(row)

Expand All @@ -272,11 +322,16 @@ def all_types_writer(bin_vec: bool, schema: CollectionSchema, file_type: BulkFil
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"vector": np.array(gen_binary_vector()).astype(np.dtype("int8")) if bin_vec else np.array(gen_float_vector()).astype(np.dtype("float32")),
# "float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
"bfloat16_vector": gen_bf16_vector(True),
f"dynamic_{i}": i,
# bulkinsert doesn't support import npy with array field, the below values will be stored into dynamic field
# bulkinsert doesn't support import npy with array field and sparse vector,
# if file_type is numpy, the below values will be stored into dynamic field
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"sparse_vector": gen_sparse_vector(),
})

print(f"{remote_writer.total_row_count} rows appends")
Expand Down Expand Up @@ -316,19 +371,29 @@ def call_bulkinsert(schema: CollectionSchema, batch_files: list):
print(f"Collection row number: {collection.num_entities}")


def retrieve_imported_data(bin_vec: bool):
def retrieve_imported_data():
collection = Collection(name=ALL_TYPES_COLLECTION_NAME)
print("Create index...")
index_param = {
"index_type": "BIN_FLAT",
"params": {},
"metric_type": "HAMMING"
} if bin_vec else {
"index_type": "FLAT",
"params": {},
"metric_type": "L2"
}
collection.create_index(field_name="vector", index_params=index_param)
for field in collection.schema.fields:
if (field.dtype == DataType.FLOAT_VECTOR or field.dtype == DataType.FLOAT16_VECTOR
or field.dtype == DataType.BFLOAT16_VECTOR):
collection.create_index(field_name=field.name, index_params={
"index_type": "FLAT",
"params": {},
"metric_type": "L2"
})
elif field.dtype == DataType.BINARY_VECTOR:
collection.create_index(field_name=field.name, index_params={
"index_type": "BIN_FLAT",
"params": {},
"metric_type": "HAMMING"
})
elif field.dtype == DataType.SPARSE_FLOAT_VECTOR:
collection.create_index(field_name=field.name, index_params={
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP",
"params": {"drop_ratio_build": 0.2}
})

ids = [100, 5000]
print(f"Load collection and query items {ids}")
Expand Down Expand Up @@ -393,28 +458,21 @@ def cloud_bulkinsert():

schema = build_simple_collection()
for file_type in file_types:
local_writer(schema=schema, file_type=file_type)
local_writer_simple(schema=schema, file_type=file_type)

for file_type in file_types:
remote_writer(schema=schema, file_type=file_type)
remote_writer_simple(schema=schema, file_type=file_type)

parallel_append(schema)

# float vectors + all scalar types
# all vector types + all scalar types
for file_type in file_types:
# Note: bulkinsert doesn't support import npy with array field
schema = build_all_type_schema(bin_vec=False, has_array=False if file_type==BulkFileType.NUMPY else True)
batch_files = all_types_writer(bin_vec=False, schema=schema, file_type=file_type)
# Note: bulkinsert doesn't support import npy with array field and sparse vector field
schema = build_all_type_schema(is_numpy= file_type==BulkFileType.NUMPY)
batch_files = all_types_writer(schema=schema, file_type=file_type)
call_bulkinsert(schema, batch_files)
retrieve_imported_data(bin_vec=False)
retrieve_imported_data()

# binary vectors + all scalar types
for file_type in file_types:
# Note: bulkinsert doesn't support import npy with array field
schema = build_all_type_schema(bin_vec=True, has_array=False if file_type == BulkFileType.NUMPY else True)
batch_files = all_types_writer(bin_vec=True, schema=schema, file_type=file_type)
call_bulkinsert(schema, batch_files)
retrieve_imported_data(bin_vec=True)

# # to call cloud bulkinsert api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
# cloud_bulkinsert()
Expand Down
46 changes: 38 additions & 8 deletions pymilvus/bulk_writer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,22 @@ def _persist_npy(self, local_path: str, **kwargs):
str_arr.append(json.dumps(val))
self._buffer[k] = str_arr

arr = np.array(self._buffer[k], dtype=dt)
# currently, milvus server doesn't support numpy for sparse vector
if field_schema.dtype == DataType.SPARSE_FLOAT_VECTOR:
self._throw(
f"Failed to persist file {full_file_name},"
f" error: milvus doesn't support parsing sparse vectors from numpy file"
)

# special process for float16 vector, the self._buffer stores bytes for
# float16 vector, convert the bytes to uint8 array
if field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
a = []
for b in self._buffer[k]:
a.append(np.frombuffer(b, dtype=dt).tolist())
arr = np.array(a, dtype=dt)
else:
arr = np.array(self._buffer[k], dtype=dt)
np.save(str(full_file_name), arr)
except Exception as e:
self._throw(f"Failed to persist file {full_file_name}, error: {e}")
Expand All @@ -173,7 +188,18 @@ def _persist_json_rows(self, local_path: str, **kwargs):
while row_index < row_count:
row = {}
for k, v in self._buffer.items():
row[k] = v[row_index]
# special process for float16 vector, the self._buffer stores bytes for
# float16 vector, convert the bytes to float list
field_schema = self._fields[k]
if field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
dt = (
np.dtype("bfloat16")
if (field_schema.dtype == DataType.BFLOAT16_VECTOR)
else np.float16
)
row[k] = np.frombuffer(v[row_index], dtype=dt).tolist()
else:
row[k] = v[row_index]
rows.append(row)
row_index = row_index + 1

Expand All @@ -196,21 +222,25 @@ def _persist_parquet(self, local_path: str, **kwargs):
data = {}
for k in self._buffer:
field_schema = self._fields[k]
if field_schema.dtype == DataType.JSON:
# for JSON field, store as string array
if field_schema.dtype in {DataType.JSON, DataType.SPARSE_FLOAT_VECTOR}:
# for JSON and SPARSE_VECTOR field, store as string array
str_arr = []
for val in self._buffer[k]:
str_arr.append(json.dumps(val))
data[k] = pd.Series(str_arr, dtype=None)
elif field_schema.dtype == DataType.FLOAT_VECTOR:
elif field_schema.dtype in {DataType.BINARY_VECTOR, DataType.FLOAT_VECTOR}:
arr = []
for val in self._buffer[k]:
arr.append(np.array(val, dtype=np.dtype("float32")))
arr.append(np.array(val, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name]))
data[k] = pd.Series(arr)
elif field_schema.dtype == DataType.BINARY_VECTOR:
elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
# special process for float16 vector, the self._buffer stores bytes for
# float16 vector, convert the bytes to uint8 array
arr = []
for val in self._buffer[k]:
arr.append(np.array(val, dtype=np.dtype("uint8")))
arr.append(
np.frombuffer(val, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name])
)
data[k] = pd.Series(arr)
elif field_schema.dtype == DataType.ARRAY:
dt = NUMPY_TYPE_CREATOR[field_schema.element_type.name]
Expand Down
40 changes: 27 additions & 13 deletions pymilvus/bulk_writer/bulk_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,23 @@ def _throw(self, msg: str):
def _verify_vector(self, x: object, field: FieldSchema):
dtype = DataType(field.dtype)
validator = TYPE_VALIDATOR[dtype.name]
dim = field.params["dim"]
if not validator(x, dim):
self._throw(
f"Illegal vector data for vector field: '{field.name}',"
f" dim is not {dim} or type mismatch"
)

return len(x) * 4 if dtype == DataType.FLOAT_VECTOR else len(x)
if dtype != DataType.SPARSE_FLOAT_VECTOR:
dim = field.params["dim"]
try:
origin_list = validator(x, dim)
if dtype == DataType.FLOAT_VECTOR:
return origin_list, dim * 4 # for float vector, each dim occupies 4 bytes
if dtype == DataType.BINARY_VECTOR:
return origin_list, dim / 8 # for binary vector, 8 dim occupies 1 byte
return origin_list, dim * 2 # for float16 vector, each dim occupies 2 bytes
except MilvusException as e:
self._throw(f"Illegal vector data for vector field: '{field.name}': {e.message}")
else:
try:
validator(x)
return x, len(x) * 12 # for sparse vector, each key-value is int-float, 12 bytes
except MilvusException as e:
self._throw(f"Illegal vector data for vector field: '{field.name}': {e.message}")

def _verify_json(self, x: object, field: FieldSchema):
size = 0
Expand Down Expand Up @@ -187,11 +196,16 @@ def _verify_row(self, row: dict):
self._throw(f"The field '{field.name}' is missed in the row")

dtype = DataType(field.dtype)
if dtype in {DataType.BINARY_VECTOR, DataType.FLOAT_VECTOR}:
if isinstance(row[field.name], np.ndarray):
row[field.name] = row[field.name].tolist()

row_size = row_size + self._verify_vector(row[field.name], field)
if dtype in {
DataType.BINARY_VECTOR,
DataType.FLOAT_VECTOR,
DataType.FLOAT16_VECTOR,
DataType.BFLOAT16_VECTOR,
DataType.SPARSE_FLOAT_VECTOR,
}:
origin_list, byte_len = self._verify_vector(row[field.name], field)
row[field.name] = origin_list
row_size = row_size + byte_len
elif dtype == DataType.VARCHAR:
row_size = row_size + self._verify_varchar(row[field.name], field)
elif dtype == DataType.JSON:
Expand Down
Loading