Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: [null & default] After using the bulk writer to import nullable data, the import is successful, but the load times out. #37162

Closed
1 task done
zhuwenxing opened this issue Oct 25, 2024 · 8 comments
Assignees
Labels
2.5-features kind/bug Issues or changes related a bug triage/accepted Indicates an issue or PR is ready to be actively worked on.
Milestone

Comments

@zhuwenxing
Copy link
Contributor

Is there an existing issue for this?

  • I have searched the existing issues

Environment

- Milvus version:master-20241025-80aa9ab4-amd64
- Deployment mode(standalone or cluster):
- MQ type(rocksmq, pulsar or kafka):    
- SDK version(e.g. pymilvus v2.0.0rc2):
- OS(Ubuntu or CentOS): 
- CPU/Memory: 
- GPU: 
- Others:

Current Behavior

2024-10-25 19:31:57 - INFO - ci_test]: [initialize_milvus] Log cleaned up, start testing... (conftest.py:233)
[2024-10-25 19:31:57 - INFO - ci_test]: [setup_class] Start setup class... (client_base.py:40)
[2024-10-25 19:31:57 - INFO - ci_test]: *********************************** setup *********************************** (client_base.py:46)
[2024-10-25 19:31:57 - INFO - ci_test]: [setup_method] Start setup test case test_with_all_field_json_with_bulk_writer. (client_base.py:47)
-------------------------------- live log call ---------------------------------
[2024-10-25 19:31:57 - INFO - local_bulk_writer]: Data path created: /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer (local_bulk_writer.py:77)
[2024-10-25 19:31:57 - INFO - local_bulk_writer]: Data path created: /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/63ad4e73-2965-42a7-9206-8709f00938f6 (local_bulk_writer.py:82)
[2024-10-25 19:31:57 - INFO - remote_bulk_writer]: Minio/S3 blob storage client successfully initialized (remote_bulk_writer.py:157)
[2024-10-25 19:31:57 - INFO - remote_bulk_writer]: Remote buffer writer initialized, target path: /bulk_data/63ad4e73-2965-42a7-9206-8709f00938f6 (remote_bulk_writer.py:123)
[2024-10-25 19:31:58 - INFO - local_bulk_writer]: Prepare to flush buffer, row_count: 1000, size: 1362390 (local_bulk_writer.py:109)
[2024-10-25 19:31:58 - INFO - local_bulk_writer]: Flush thread begin, name: Thread-10 (local_bulk_writer.py:116)
[2024-10-25 19:31:58 - INFO - local_bulk_writer]: Wait flush to finish (local_bulk_writer.py:120)
[2024-10-25 19:31:58 - INFO - bulk_buffer]: Successfully persist file /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/63ad4e73-2965-42a7-9206-8709f00938f6/1.json, row count: 1000 (buffer.py:221)
[2024-10-25 19:31:58 - INFO - remote_bulk_writer]: Prepare to upload '/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/63ad4e73-2965-42a7-9206-8709f00938f6/1.json' to 'bulk_data/63ad4e73-2965-42a7-9206-8709f00938f6/1.json' (remote_bulk_writer.py:233)
[2024-10-25 19:31:58 - INFO - remote_bulk_writer]: Target bucket: 'full-text-search-perf-test' (remote_bulk_writer.py:235)
[2024-10-25 19:32:00 - INFO - remote_bulk_writer]: Upload file '/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/63ad4e73-2965-42a7-9206-8709f00938f6/1.json' to 'bulk_data/63ad4e73-2965-42a7-9206-8709f00938f6/1.json' (remote_bulk_writer.py:257)
[2024-10-25 19:32:00 - INFO - remote_bulk_writer]: Successfully upload files: ['/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/63ad4e73-2965-42a7-9206-8709f00938f6/1.json'] (remote_bulk_writer.py:303)
[2024-10-25 19:32:00 - INFO - local_bulk_writer]: Flush thread finished, name: Thread-10 (local_bulk_writer.py:146)
[2024-10-25 19:32:00 - INFO - local_bulk_writer]: Commit done with async=False (local_bulk_writer.py:124)
[2024-10-25 19:32:00 - INFO - local_bulk_writer]: Delete local directory '/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/63ad4e73-2965-42a7-9206-8709f00938f6' (local_bulk_writer.py:88)
[2024-10-25 19:32:00 - INFO - ci_test]: before bulk load, there are 0 working tasks (utility_wrapper.py:25)
[2024-10-25 19:32:00 - INFO - ci_test]: files to load: ['bulk_data/63ad4e73-2965-42a7-9206-8709f00938f6/1.json'] (utility_wrapper.py:26)
[2024-10-25 19:32:01 - INFO - ci_test]: after bulk load, there are 1 working tasks (utility_wrapper.py:34)
[2024-10-25 19:32:01 - INFO - root]: bulk insert task ids:453464036885812051 (test_bulk_insert.py:1530)
[2024-10-25 19:32:01 - INFO - ci_test]: wait bulk load timeout is 300 (utility_wrapper.py:111)
[2024-10-25 19:32:01 - INFO - ci_test]: before waiting, there are 0 pending tasks (utility_wrapper.py:113)
[2024-10-25 19:32:17 - INFO - ci_test]: after waiting, there are 0 pending tasks (utility_wrapper.py:148)
[2024-10-25 19:32:17 - INFO - ci_test]: task state distribution: {'success': {453464036885812051}, 'failed': set(), 'in_progress': set()} (utility_wrapper.py:149)
[2024-10-25 19:32:17 - INFO - ci_test]: {453464036885812051: <Bulk insert state:
    - taskID          : 453464036885812051,
    - state           : Completed,
    - row_count       : 1000,
    - infos           : {'failed_reason': '', 'progress_percent': '100'},
    - id_ranges       : [],
    - create_ts       : 2024-10-25 19:32:00
>} (utility_wrapper.py:150)
[2024-10-25 19:32:17 - INFO - ci_test]: wait for bulk load tasks completed successfully, cost time: 16.629183053970337 (utility_wrapper.py:152)
[2024-10-25 19:32:17 - INFO - ci_test]: bulk insert state:True in 17.693009853363037 with states:{453464036885812051: <Bulk insert state:
    - taskID          : 453464036885812051,
    - state           : Completed,
    - row_count       : 1000,
    - infos           : {'failed_reason': '', 'progress_percent': '100'},
    - id_ranges       : [],
    - create_ts       : 2024-10-25 19:32:00
>} (test_bulk_insert.py:1535)
[2024-10-25 19:32:18 - INFO - ci_test]:  collection entities: 1000 (test_bulk_insert.py:1538)
[2024-10-25 19:35:20 - ERROR - pymilvus.decorators]: RPC error: [wait_for_loading_collection], <MilvusException: (code=1, message=wait for loading collection timeout, collection: bulk_insert_vZNCFkeQ)>, <Time:{'RPC start': '2024-10-25 19:32:20.702075', 'RPC error': '2024-10-25 19:35:20.729652'}> (decorators.py:140)
[2024-10-25 19:35:20 - WARNING - pymilvus.decorators]: Retry timeout: 180s (decorators.py:106)
[2024-10-25 19:35:20 - ERROR - pymilvus.decorators]: RPC error: [load_collection], <MilvusException: (code=1, message=Retry timeout: 180s, message=wait for loading collection timeout, collection: bulk_insert_vZNCFkeQ)>, <Time:{'RPC start': '2024-10-25 19:32:20.672642', 'RPC error': '2024-10-25 19:35:20.730139'}> (decorators.py:140)
[2024-10-25 19:35:20 - ERROR - ci_test]: Traceback (most recent call last):
  File "/Users/zilliz/workspace/milvus/tests/python_client/utils/api_request.py", line 32, in inner_wrapper
    res = func(*args, **_kwargs)
  File "/Users/zilliz/workspace/milvus/tests/python_client/utils/api_request.py", line 63, in api_request
    return func(*arg, **kwargs)
  File "/Users/zilliz/opt/anaconda3/envs/full_text_search/lib/python3.8/site-packages/pymilvus/orm/collection.py", line 429, in load
    conn.load_collection(
  File "/Users/zilliz/opt/anaconda3/envs/full_text_search/lib/python3.8/site-packages/pymilvus/decorators.py", line 141, in handler
    raise e from e
  File "/Users/zilliz/opt/anaconda3/envs/full_text_search/lib/python3.8/site-packages/pymilvus/decorators.py", line 137, in handler
    return func(*args, **kwargs)
  File "/Users/zilliz/opt/anaconda3/envs/full_text_search/lib/python3.8/site-packages/pymilvus/decorators.py", line 176, in handler
    return func(self, *args, **kwargs)
  File "/Users/zilliz/opt/anaconda3/envs/full_text_search/lib/python3.8/site-packages/pymilvus/decorators.py", line 107, in handler
    raise MilvusException(
pymilvus.exceptions.MilvusException: <MilvusException: (code=1, message=Retry timeout: 180s, message=wait for loading collection timeout, collection: bulk_insert_vZNCFkeQ)>
 (api_request.py:45)
[2024-10-25 19:35:20 - ERROR - ci_test]: (api_response) : <MilvusException: (code=1, message=Retry timeout: 180s, message=wait for loading collection timeout, collection: bulk_insert_vZNCFkeQ)> (api_request.py:46)
FAILED
testcases/test_bulk_insert.py:1444 (TestBulkInsert.test_with_all_field_json_with_bulk_writer[doc-True-True-1000-128-True])
self = <test_bulk_insert.TestBulkInsert object at 0x12fdc4c40>, auto_id = True
dim = 128, entities = 1000, enable_dynamic_field = True, sparse_format = 'doc'
nullable = True

    @pytest.mark.tags(CaseLabel.L3)
    @pytest.mark.parametrize("auto_id", [True, False])
    @pytest.mark.parametrize("dim", [128])  # 128
    @pytest.mark.parametrize("entities", [1000])  # 1000
    @pytest.mark.parametrize("enable_dynamic_field", [True, False])
    @pytest.mark.parametrize("nullable", [True, False])
    @pytest.mark.parametrize("sparse_format", ["doc", "coo"])
    def test_with_all_field_json_with_bulk_writer(self, auto_id, dim, entities, enable_dynamic_field, sparse_format, nullable):
        """
        collection schema 1: [pk, int64, float64, string float_vector]
        data file: vectors.npy and uid.npy,
        Steps:
        1. create collection
        2. import data
        3. verify
        """
        self._connect()
        fields = [
            cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
            cf.gen_int64_field(name=df.int_field, nullable=nullable),
            cf.gen_float_field(name=df.float_field, nullable=nullable),
            cf.gen_string_field(name=df.string_field, nullable=nullable),
            cf.gen_json_field(name=df.json_field, nullable=nullable),
            cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64, nullable=nullable),
            cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT, nullable=nullable),
            cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100, nullable=nullable),
            cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL, nullable=nullable),
            cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
            cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim),
            cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
            cf.gen_sparse_vec_field(name=df.sparse_vec_field),
        ]
        c_name = cf.gen_unique_str("bulk_insert")
        schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
        self.collection_wrap.init_collection(c_name, schema=schema)
        with RemoteBulkWriter(
            schema=schema,
            remote_path="bulk_data",
            connect_param=RemoteBulkWriter.ConnectParam(
                bucket_name=self.bucket_name,
                endpoint=self.minio_endpoint,
                access_key="minioadmin",
                secret_key="minioadmin",
            ),
            file_type=BulkFileType.JSON,
        ) as remote_writer:
            json_value = [
                # 1,
                # 1.0,
                # "1",
                # [1, 2, 3],
                # ["1", "2", "3"],
                # [1, 2, "3"],
                {"key": "value"},
            ]
            for i in range(entities):
                row = {
                    df.pk_field: i,
                    df.int_field: 1 if not (nullable and random.random() < 0.5) else None,
                    df.float_field: 1.0 if not (nullable and random.random() < 0.5) else None,
                    df.string_field: "string" if not (nullable and random.random() < 0.5) else None,
                    df.json_field: json_value[i%len(json_value)] if not (nullable and random.random() < 0.5) else None,
                    df.array_int_field: [1, 2] if not (nullable and random.random() < 0.5) else None,
                    df.array_float_field: [1.0, 2.0] if not (nullable and random.random() < 0.5) else None,
                    df.array_string_field: ["string1", "string2"] if not (nullable and random.random() < 0.5) else None,
                    df.array_bool_field: [True, False] if not (nullable and random.random() < 0.5) else None,
                    df.float_vec_field: cf.gen_vectors(1, dim)[0],
                    df.fp16_vec_field: cf.gen_vectors(1, dim, vector_data_type="FLOAT16_VECTOR")[0],
                    df.bf16_vec_field: cf.gen_vectors(1, dim, vector_data_type="BFLOAT16_VECTOR")[0],
                    df.sparse_vec_field: cf.gen_sparse_vectors(1, dim, sparse_format=sparse_format)[0]
                }
                if auto_id:
                    row.pop(df.pk_field)
                if enable_dynamic_field:
                    row["name"] = fake.name()
                    row["address"] = fake.address()
                remote_writer.append_row(row)
            remote_writer.commit()
            files = remote_writer.batch_files
        # import data
        for f in files:
            t0 = time.time()
            task_id, _ = self.utility_wrap.do_bulk_insert(
                collection_name=c_name, files=f
            )
            logging.info(f"bulk insert task ids:{task_id}")
            success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
                task_ids=[task_id], timeout=300
            )
            tt = time.time() - t0
            log.info(f"bulk insert state:{success} in {tt} with states:{states}")
            assert success
        num_entities = self.collection_wrap.num_entities
        log.info(f" collection entities: {num_entities}")
        assert num_entities == entities
        # verify imported data is available for search
        index_params = ct.default_index
        float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
        sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
        for f in float_vec_fields:
            self.collection_wrap.create_index(
                field_name=f, index_params=index_params
            )
        for f in sparse_vec_fields:
            self.collection_wrap.create_index(
                field_name=f, index_params=ct.default_sparse_inverted_index
            )
>       self.collection_wrap.load()

test_bulk_insert.py:1552: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../utils/wrapper.py:33: in inner_wrapper
    res, result = func(*args, **kwargs)
../base/collection_wrapper.py:108: in load
    check_result = ResponseChecker(res, func_name, check_task, check_items, check,
../check/func_check.py:34: in run
    result = self.assert_succ(self.succ, True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <check.func_check.ResponseChecker object at 0x12fdc4580>, actual = False
expect = True

    def assert_succ(self, actual, expect):
>       assert actual is expect, f"Response of API {self.func_name} expect {expect}, but got {actual}"
E       AssertionError: Response of API load expect True, but got False

../check/func_check.py:116: AssertionError

Expected Behavior

No response

Steps To Reproduce

No response

Milvus Log

4am,
chaos-testing
pod info

full-text-search-perf-test-etcd-0                                 1/1     Running       0                  3d      10.104.16.224   4am-node21   <none>           <none>
full-text-search-perf-test-etcd-1                                 1/1     Running       0                  3d      10.104.33.123   4am-node36   <none>           <none>
full-text-search-perf-test-etcd-2                                 1/1     Running       0                  3d      10.104.26.117   4am-node32   <none>           <none>
full-text-search-perf-test-kafka-0                                2/2     Running       1 (3d ago)         3d      10.104.30.110   4am-node38   <none>           <none>
full-text-search-perf-test-kafka-1                                2/2     Running       0                  3d      10.104.33.128   4am-node36   <none>           <none>
full-text-search-perf-test-kafka-2                                2/2     Running       0                  3d      10.104.16.228   4am-node21   <none>           <none>
full-text-search-perf-test-kafka-exporter-6848d7f484-dfqn5        1/1     Running       4 (3d ago)         3d      10.104.16.218   4am-node21   <none>           <none>
full-text-search-perf-test-kafka-zookeeper-0                      1/1     Running       0                  3d      10.104.16.225   4am-node21   <none>           <none>
full-text-search-perf-test-kafka-zookeeper-1                      1/1     Running       0                  3d      10.104.26.119   4am-node32   <none>           <none>
full-text-search-perf-test-kafka-zookeeper-2                      1/1     Running       0                  3d      10.104.33.129   4am-node36   <none>           <none>
full-text-search-perf-test-milvus-datanode-77ffcb949f-sfzgs       1/1     Running       0                  7h51m   10.104.13.205   4am-node16   <none>           <none>
full-text-search-perf-test-milvus-datanode-77ffcb949f-x59ft       1/1     Running       0                  7h52m   10.104.5.105    4am-node12   <none>           <none>
full-text-search-perf-test-milvus-datanode-77ffcb949f-zchbl       1/1     Running       0                  7h53m   10.104.6.179    4am-node13   <none>           <none>
full-text-search-perf-test-milvus-indexnode-56974cdbb4-d6np2      1/1     Running       0                  7h53m   10.104.23.74    4am-node27   <none>           <none>
full-text-search-perf-test-milvus-indexnode-56974cdbb4-g5qt7      1/1     Running       0                  7h55m   10.104.1.77     4am-node10   <none>           <none>
full-text-search-perf-test-milvus-indexnode-56974cdbb4-qwdq9      1/1     Running       0                  7h54m   10.104.13.200   4am-node16   <none>           <none>
full-text-search-perf-test-milvus-mixcoord-79b5f96999-tbg5b       1/1     Running       0                  7h53m   10.104.6.177    4am-node13   <none>           <none>
full-text-search-perf-test-milvus-proxy-7d9bf85645-dd8jw          1/1     Running       0                  7h53m   10.104.6.178    4am-node13   <none>           <none>
full-text-search-perf-test-milvus-querynode-0-65b86bfb76-mh28l    1/1     Running       0                  7h50m   10.104.13.206   4am-node16   <none>           <none>
full-text-search-perf-test-milvus-querynode-0-65b86bfb76-mh4rr    1/1     Running       0                  7h42m   10.104.1.110    4am-node10   <none>           <none>
full-text-search-perf-test-minio-0                                1/1     Running       0                  3d      10.104.16.226   4am-node21   <none>           <none>
full-text-search-perf-test-minio-1                                1/1     Running       0                  3d      10.104.33.125   4am-node36   <none>           <none>
full-text-search-perf-test-minio-2                                1/1     Running       0                  3d      10.104.26.118   4am-node32   <none>           <none>
full-text-search-perf-test-minio-3                                1/1     Running       0                  3d      10.104.30.109   4am-node38   <none>           <none>

log.log

full log can go to loki

Anything else?

No response

@zhuwenxing zhuwenxing added kind/bug Issues or changes related a bug needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels Oct 25, 2024
@smellthemoon
Copy link
Contributor

related with #37128

@binbinlv
Copy link
Contributor

/assign @smellthemoon

@binbinlv binbinlv changed the title [Bug]: After using the bulk writer to import nullable data, the import is successful, but the load times out. [Bug]: [null & default] After using the bulk writer to import nullable data, the import is successful, but the load times out. Oct 25, 2024
@binbinlv
Copy link
Contributor

binbinlv commented Oct 25, 2024

@smellthemoon As issue #36265, it does not support bulk writer by design at this phase. But it seems now bulk writer support none in json format (not support in csv format milvus-io/pymilvus#2313).

So now what is the expected behavior for bulk writer supporting None and default?

@binbinlv binbinlv added triage/accepted Indicates an issue or PR is ready to be actively worked on. and removed needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels Oct 25, 2024
@binbinlv
Copy link
Contributor

/unassign @yanliang567

@yanliang567 yanliang567 added this to the 2.5.0 milestone Oct 26, 2024
@smellthemoon
Copy link
Contributor

pr merged, could you help to verify? @zhuwenxing

@xiaofan-luan
Copy link
Collaborator

/assign @zhuwenxing

@zhuwenxing
Copy link
Contributor Author

/assign @binbinlv

The bulk writer now supports nullable, please add the corresponding case and verify this issue.

@binbinlv
Copy link
Contributor

binbinlv commented Nov 8, 2024

Verified and fixed:

milvus: master-20241107-994f52fa-amd64
pymilvus: 2.5.0rc107

results:

image

@binbinlv binbinlv closed this as completed Nov 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2.5-features kind/bug Issues or changes related a bug triage/accepted Indicates an issue or PR is ready to be actively worked on.
Projects
None yet
Development

No branches or pull requests

5 participants