Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk cherry-picks #1243

Merged
merged 1 commit into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/api/api.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. _api:

API Reference
============
=============

.. toctree::
:maxdepth: 1
Expand Down
4 changes: 2 additions & 2 deletions docs/source/api/collection.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
=========
==========
Collection
=========
==========

The scheme of a collection is fixed when collection created. Collection scheme consists of many fields,
and must contain a vector field. A field to collection is like a column to RDBMS table. Data type are the same in one field.
Expand Down
2 changes: 1 addition & 1 deletion examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def search(collection, vector_field, id_field, search_vectors):
"anns_field": vector_field,
"param": {"metric_type": _METRIC_TYPE, "params": {"nprobe": _NPROBE}},
"limit": _TOPK,
"expr": "id_field > 0"}
"expr": "id_field >= 0"}
results = collection.search(**search_param)
for i, result in enumerate(results):
print("\nSearch result for {}th vector: ".format(i))
Expand Down
112 changes: 53 additions & 59 deletions examples/example_bulkload.py → examples/example_bulkinsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
# rocksmq:
# path: /tmp/milvus/rdb_data
# storageType: local
MILVUS_DATA_PATH = "/tmp/milvus/data/"

FILES_PATH = "/tmp/milvus_bulkinsert/"

# Milvus service address
_HOST = '127.0.0.1'
Expand All @@ -42,6 +43,8 @@
# Vector field parameter
_DIM = 8

# to generate increment ID
id_start = 1

# Create a Milvus connection
def create_connection():
Expand All @@ -62,7 +65,7 @@ def create_connection():

# Create a collection
def create_collection():
field1 = FieldSchema(name=_ID_FIELD_NAME, dtype=DataType.INT64, description="int64", is_primary=True, auto_id=True)
field1 = FieldSchema(name=_ID_FIELD_NAME, dtype=DataType.INT64, description="int64", is_primary=True, auto_id=False)
field2 = FieldSchema(name=_VECTOR_FIELD_NAME, dtype=DataType.FLOAT_VECTOR, description="float vector", dim=_DIM,
is_primary=False)
field3 = FieldSchema(name=_STR_FIELD_NAME, dtype=DataType.VARCHAR, description="string",
Expand Down Expand Up @@ -107,12 +110,15 @@ def create_partition(collection, partition_name):
# ]
# }
def gen_json_rowbased(num, path, tag):
global id_start
rows = []
for i in range(num):
rows.append({
_ID_FIELD_NAME: id_start,
_STR_FIELD_NAME: tag + str(i),
_VECTOR_FIELD_NAME: [round(random.random(), 6) for _ in range(_DIM)],
})
id_start = id_start + 1

data = {
"rows": rows,
Expand All @@ -121,7 +127,7 @@ def gen_json_rowbased(num, path, tag):
json.dump(data, json_file)


# Bulkload for row-based files, each file is converted to a task.
# For row-based files, each file is converted to a task. Each time you can call do_bulk_insert() to insert one file.
# The rootcoord maintains a task list, each idle datanode will receive a task. If no datanode available, the task will
# be put into pending list to wait, the max size of pending list is 32. If new tasks count exceed spare quantity of
# pending list, the do_bulk_insert() method will return error.
Expand All @@ -135,27 +141,22 @@ def gen_json_rowbased(num, path, tag):
# But if the segment.maxSize of milvus.yml is set to a small value, there could be shardNum*2, shardNum*3 segments
# generated, or even more.
def bulk_insert_rowbased(row_count_each_file, file_count, tag, partition_name = None):
# make sure the data path is exist
exist = os.path.exists(MILVUS_DATA_PATH)
if not exist:
os.mkdir(MILVUS_DATA_PATH)

file_names = []
for i in range(file_count):
file_names.append("rows_" + str(i) + ".json")
# make sure the files folder is created
os.makedirs(name=FILES_PATH, exist_ok=True)

task_ids = []
for filename in file_names:
print("Generate row-based file:", MILVUS_DATA_PATH + filename)
gen_json_rowbased(row_count_each_file, MILVUS_DATA_PATH + filename, tag)
print("Import row-based file:", filename)
for i in range(file_count):
file_path = FILES_PATH + "rows_" + str(i) + ".json"
print("Generate row-based file:", file_path)
gen_json_rowbased(row_count_each_file, file_path, tag)
print("Import row-based file:", file_path)
task_id = utility.do_bulk_insert(collection_name=_COLLECTION_NAME,
partition_name=partition_name,
files=[filename])
files=[file_path])
task_ids.append(task_id)
return wait_tasks_persisted(task_ids)

# wait all bulk insert tasks to be a certain state
# Wait all bulk insert tasks to be a certain state
# return the states of all the tasks, including failed task
def wait_tasks_to_state(task_ids, state_code):
wait_ids = task_ids
Expand Down Expand Up @@ -186,7 +187,7 @@ def wait_tasks_to_state(task_ids, state_code):

# Get bulk insert task state to check whether the data file has been parsed and persisted successfully.
# Persisted state doesn't mean the data is queryable, to query the data, you need to wait until the segment is
# loaded into memory.
# indexed successfully and loaded into memory.
def wait_tasks_persisted(task_ids):
print("=========================================================================================================")
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportPersisted)
Expand All @@ -204,7 +205,10 @@ def wait_tasks_persisted(task_ids):

# Get bulk insert task state to check whether the data file has been indexed successfully.
# If the state of bulk insert task is BulkInsertState.ImportCompleted, that means the data is queryable.
def wait_tasks_competed(task_ids):
def wait_tasks_competed(tasks):
task_ids = []
for task in tasks:
task_ids.append(task.task_id)
print("=========================================================================================================")
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportCompleted)
complete_count = 0
Expand All @@ -221,8 +225,8 @@ def wait_tasks_competed(task_ids):

# List all bulk insert tasks, including pending tasks, working tasks and finished tasks.
# the parameter 'limit' is: how many latest tasks should be returned, if the limit<=0, all the tasks will be returned
def list_all_bulk_insert_tasks(limit):
tasks = utility.list_bulk_insert_tasks(limit)
def list_all_bulk_insert_tasks(collection_name=_COLLECTION_NAME, limit=0):
tasks = utility.list_bulk_insert_tasks(limit=limit, collection_name=collection_name)
print("=========================================================================================================")
print("list bulk insert tasks with limit", limit)
pending = 0
Expand Down Expand Up @@ -274,36 +278,31 @@ def release_collection(collection):


# ANN search
def search(collection, vector_field, search_vectors, partition_name = None, consistency_level = "Eventually"):
def search(collection, vector_field, search_vector, consistency_level = "Eventually"):
search_param = {
"data": search_vectors,
"data": [search_vector],
"anns_field": vector_field,
"param": {"metric_type": "L2", "params": {"nprobe": 10}},
"limit": 10,
"output_fields": [_STR_FIELD_NAME],
"consistency_level": consistency_level,
}
if partition_name != None:
search_param["partition_names"] = [partition_name]

results = collection.search(**search_param)
print("=========================================================================================================")
for i, result in enumerate(results):
if partition_name != None:
print("Search result for {}th vector in partition '{}': ".format(i, partition_name))
else:
print("Search result for {}th vector: ".format(i))

for j, res in enumerate(result):
print(f"\ttop{j}: {res}, {_STR_FIELD_NAME}: {res.entity.get(_STR_FIELD_NAME)}")
print("\thits count:", len(result))
result = results[0]
for j, res in enumerate(result):
print(f"\ttop{j}: {res}, {_STR_FIELD_NAME}: {res.entity.get(_STR_FIELD_NAME)}")
print("\thits count:", len(result))
print("=========================================================================================================\n")

# delete entities
def delete(collection, ids):
print("=========================================================================================================\n")
print("Delete these entities:", ids)
expr = _ID_FIELD_NAME + " in " + str(ids)
collection.delete(expr=expr)
print("=========================================================================================================\n")

# retrieve entities
def retrieve(collection, ids):
Expand Down Expand Up @@ -339,19 +338,17 @@ def main():
list_collections()

# do bulk_insert, wait all tasks finish persisting
task_ids = []
tasks = bulk_insert_rowbased(row_count_each_file=1000, file_count=1, tag="to_default_")
for task in tasks:
task_ids.append(task.task_id)
tasks = bulk_insert_rowbased(row_count_each_file=1000, file_count=3, tag="to_partition_", partition_name=a_partition)
for task in tasks:
task_ids.append(task.task_id)
all_tasks = []
tasks = bulk_insert_rowbased(row_count_each_file=1000, file_count=3, tag="to_default_")
all_tasks.extend(tasks)
tasks = bulk_insert_rowbased(row_count_each_file=1000, file_count=1, tag="to_partition_", partition_name=a_partition)
all_tasks.extend(tasks)

# wai until all tasks completed(completed means queryable)
wait_tasks_competed(task_ids)
wait_tasks_competed(all_tasks)

# list all tasks
list_all_bulk_insert_tasks(len(task_ids))
list_all_bulk_insert_tasks()

# get the number of entities
get_entity_num(collection)
Expand All @@ -360,30 +357,27 @@ def main():
print("wait 5 seconds to load the data")
time.sleep(5)

# search in entire collection
vector = [round(random.random(), 6) for _ in range(_DIM)]
vectors = [vector]
print("Use a random vector to search in entire collection")
search(collection, _VECTOR_FIELD_NAME, vectors)
# pick some entities
delete_ids = [50, 100]
id_vectors = retrieve(collection, delete_ids)

# search in a partition
print("Use a random vector to search in partition:", a_partition)
search(collection, _VECTOR_FIELD_NAME, vectors, partition_name=a_partition)
# search in entire collection
for id_vector in id_vectors:
id = id_vector[_ID_FIELD_NAME]
vector = id_vector[_VECTOR_FIELD_NAME]
print("Search id:", id, ", compare this id to the top0 of search result, they are equal")
search(collection, _VECTOR_FIELD_NAME, vector)

# pick some entities to delete
delete_ids = []
for task in tasks:
delete_ids.append(task.ids[5])
id_vectors = retrieve(collection, delete_ids)
# delete the picked entities
delete(collection, delete_ids)

# search the delete entities to check existence, check the top0 of the search result
# search the delete entities to check existence
for id_vector in id_vectors:
id = id_vector[_ID_FIELD_NAME]
vector = id_vector[_VECTOR_FIELD_NAME]
print("Search id:", id, ", compare this id to the top0 of search result, the entity with the id has been deleted")
print("Search id:", id, ", compare this id to the top0 result, they are not equal since the id has been deleted")
# here we use Stong consistency level to do search, because we need to make sure the delete operation is applied
search(collection, _VECTOR_FIELD_NAME, [vector], partition_name=None, consistency_level="Strong")
search(collection, _VECTOR_FIELD_NAME, vector, consistency_level="Strong")

# release memory
release_collection(collection)
Expand Down
12 changes: 11 additions & 1 deletion examples/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,20 @@ def test_partition():

data = gen_data(default_nb)
print("insert data to partition")
partition.insert(data)
res = partition.insert(data)
collection.flush()
print(res.insert_count)
assert partition.is_empty is False
assert partition.num_entities == default_nb

print("start to create index")
index = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128},
}
collection.create_index(default_float_vec_field_name, index)

print("load partition")
partition.load()
topK = 5
Expand Down
2 changes: 1 addition & 1 deletion pymilvus/client/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def err_index(self):

def __str__(self):
return f"(insert count: {self._insert_cnt}, delete count: {self._delete_cnt}, upsert count: {self._upsert_cnt}, " \
"timestamp: {self._timestamp}, success count: {self.succ_count}, err count: {self.err_count})"
f"timestamp: {self._timestamp}, success count: {self.succ_count}, err count: {self.err_count})"

__repr__ = __str__

Expand Down
Loading