From ee87a02c64d4fb7069d02d44aa0158a6c3ea15e8 Mon Sep 17 00:00:00 2001 From: groot Date: Fri, 18 Nov 2022 14:33:04 +0800 Subject: [PATCH] Modify bulkinsert example (#1228) Signed-off-by: yhmo Signed-off-by: yhmo --- ...mple_bulkload.py => example_bulkinsert.py} | 30 ++++++++----------- pymilvus/orm/utility.py | 23 ++++++++++++++ 2 files changed, 36 insertions(+), 17 deletions(-) rename examples/{example_bulkload.py => example_bulkinsert.py} (95%) diff --git a/examples/example_bulkload.py b/examples/example_bulkinsert.py similarity index 95% rename from examples/example_bulkload.py rename to examples/example_bulkinsert.py index a535835a8..85255b4f7 100644 --- a/examples/example_bulkload.py +++ b/examples/example_bulkinsert.py @@ -24,7 +24,8 @@ # rocksmq: # path: /tmp/milvus/rdb_data # storageType: local -MILVUS_DATA_PATH = "/tmp/milvus/data/" + +FILES_PATH = "/tmp/milvus_bulkinsert/" # Milvus service address _HOST = '127.0.0.1' @@ -126,7 +127,7 @@ def gen_json_rowbased(num, path, tag): json.dump(data, json_file) -# Bulkload for row-based files, each file is converted to a task. +# For row-based files, each file is converted to a task. Each time you can call do_bulk_insert() to insert one file. # The rootcoord maintains a task list, each idle datanode will receive a task. If no datanode available, the task will # be put into pending list to wait, the max size of pending list is 32. If new tasks count exceed spare quantity of # pending list, the do_bulk_insert() method will return error. @@ -140,27 +141,22 @@ def gen_json_rowbased(num, path, tag): # But if the segment.maxSize of milvus.yml is set to a small value, there could be shardNum*2, shardNum*3 segments # generated, or even more. def bulk_insert_rowbased(row_count_each_file, file_count, tag, partition_name = None): - # make sure the data path is exist - exist = os.path.exists(MILVUS_DATA_PATH) - if not exist: - os.mkdir(MILVUS_DATA_PATH) - - file_names = [] - for i in range(file_count): - file_names.append("rows_" + str(i) + ".json") + # make sure the files folder is created + os.makedirs(name=FILES_PATH, exist_ok=True) task_ids = [] - for filename in file_names: - print("Generate row-based file:", MILVUS_DATA_PATH + filename) - gen_json_rowbased(row_count_each_file, MILVUS_DATA_PATH + filename, tag) - print("Import row-based file:", filename) + for i in range(file_count): + file_path = FILES_PATH + "rows_" + str(i) + ".json" + print("Generate row-based file:", file_path) + gen_json_rowbased(row_count_each_file, file_path, tag) + print("Import row-based file:", file_path) task_id = utility.do_bulk_insert(collection_name=_COLLECTION_NAME, partition_name=partition_name, - files=[filename]) + files=[file_path]) task_ids.append(task_id) return wait_tasks_persisted(task_ids) -# wait all bulk insert tasks to be a certain state +# Wait all bulk insert tasks to be a certain state # return the states of all the tasks, including failed task def wait_tasks_to_state(task_ids, state_code): wait_ids = task_ids @@ -191,7 +187,7 @@ def wait_tasks_to_state(task_ids, state_code): # Get bulk insert task state to check whether the data file has been parsed and persisted successfully. # Persisted state doesn't mean the data is queryable, to query the data, you need to wait until the segment is -# loaded into memory. +# indexed successfully and loaded into memory. def wait_tasks_persisted(task_ids): print("=========================================================================================================") states = wait_tasks_to_state(task_ids, BulkInsertState.ImportPersisted) diff --git a/pymilvus/orm/utility.py b/pymilvus/orm/utility.py index cdd8f63e5..5d7096082 100644 --- a/pymilvus/orm/utility.py +++ b/pymilvus/orm/utility.py @@ -670,6 +670,17 @@ def do_bulk_insert(collection_name: str, files: list, partition_name=None, timeo :raises BaseException: If collection_name doesn't exist. :raises BaseException: If the files input is illegal. + + :example: + >>> from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility + >>> connections.connect() + >>> schema = CollectionSchema([ + ... FieldSchema("film_id", DataType.INT64, is_primary=True), + ... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2) + ... ]) + >>> collection = Collection("test_collection_bulk_insert", schema) + >>> task_id = utility.do_bulk_insert(collection_name=collection.name, files=['data.json']) + >>> print(task_id) """ return _get_connection(using).do_bulk_insert(collection_name, partition_name, files, timeout=timeout, **kwargs) @@ -682,6 +693,13 @@ def get_bulk_insert_state(task_id, timeout=None, using="default", **kwargs) -> B :return: BulkInsertState :rtype: BulkInsertState + + :example: + >>> from pymilvus import connections, utility, BulkInsertState + >>> connections.connect() + >>> state = utility.get_bulk_insert_state(task_id=id) # the id is returned by do_bulk_insert() + >>> if state.state == BulkInsertState.ImportFailed or state.state == BulkInsertState.ImportFailedAndCleaned: + >>> print("task id:", state.task_id, "failed, reason:", state.failed_reason) """ return _get_connection(using).get_bulk_insert_state(task_id, timeout=timeout, **kwargs) @@ -698,6 +716,11 @@ def list_bulk_insert_tasks(limit=0, collection_name=None, timeout=None, using="d :return: list[BulkInsertState] :rtype: list[BulkInsertState] + :example: + >>> from pymilvus import connections, utility, BulkInsertState + >>> connections.connect() + >>> tasks = utility.list_bulk_insert_tasks(collection_name=collection_name) + >>> print(tasks) """ return _get_connection(using).list_bulk_insert_tasks(limit, collection_name, timeout=timeout, **kwargs)