Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: When using the bulk writer to generate CSV files, if there are bf16 and float16 vector types present, a JSON dump error will occur. #2276

Open
1 task done
zhuwenxing opened this issue Sep 26, 2024 · 14 comments
Assignees
Labels
kind/bug Something isn't working

Comments

@zhuwenxing
Copy link
Contributor

Is there an existing issue for this?

  • I have searched the existing issues

Describe the bug

When using the bulk writer to format CSV files, if bf16 and float16 vector types are present, a JSON dump error will occur.

Failed to convert field float16_vec_field value to string, error: Object of type ndarray is not JSON serializable

Expected Behavior

works well

Steps/Code To Reproduce behavior

No response

Environment details

- Hardware/Softward conditions (OS, CPU, GPU, Memory):
- Method of installation (Docker, or from source):
- Milvus version (v0.3.1, or v0.4.0):
- Milvus configuration (Settings you made in `server_config.yaml`):

Anything else?

No response

@zhuwenxing zhuwenxing added the kind/bug Something isn't working label Sep 26, 2024
@zhuwenxing zhuwenxing changed the title [Bug]: When using the bulk writer to format CSV files, if there are bf16 and float16 vector types present, a JSON dump error will occur. [Bug]: When using the bulk writer to generate CSV files, if there are bf16 and float16 vector types present, a JSON dump error will occur. Sep 26, 2024
@zhuwenxing
Copy link
Contributor Author

/assign @OxalisCu

@sre-ci-robot
Copy link

@zhuwenxing: GitHub didn't allow me to assign the following users: OxalisCu.

Note that only milvus-io members, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time.
For more information please see the contributor guide

In response to this:

/assign @OxalisCu

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@zhuwenxing
Copy link
Contributor Author

/assign @yhmo
Can you help me to assign the issue?

@zhuwenxing
Copy link
Contributor Author

https://github.com/milvus-io/pymilvus/blob/master/examples/example_bulkinsert_csv.py
@OxalisCu
This example lacks many data types and can be supplemented with all data types. At the same time, this example also lacks the usage of bulk writers, please add that as well.

@zhuwenxing
Copy link
Contributor Author

def gen_csv_rowbased(num, path, partition_name, sep=","):
global id_start
header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "dynamic_field"]
rows = []
for i in range(num):
rows.append([
id_start, # id field
json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
"{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
id_start, # no field matches this value, this value will be put into dynamic field
])
id_start = id_start + 1
data = [header] + rows
with open(path, "w") as f:
writer = csv.writer(f, delimiter=sep)
for row in data:
writer.writerow(row)

@OxalisCu @yhmo

At the same time, is the usage of Dynamic SCHEMA correct in this example? I understand it should be written like this, right?

def gen_csv_rowbased(num, path, partition_name, sep=","):
    global id_start
    header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "$meta"]
    rows = []
    for i in range(num):
        rows.append([
            id_start, # id field
            json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
            json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
            "{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
             json.dumps({"dynameic_fiield_1": 1, "dynamic_field_2":2}), # no field matches this value, this value will be put into dynamic field
        ])
        id_start = id_start + 1
    data = [header] + rows
    with open(path, "w") as f:
        writer = csv.writer(f, delimiter=sep)
        for row in data:
            writer.writerow(row)

@zhuwenxing
Copy link
Contributor Author

A good user experience would be if my inserted data is a dataframe, which can be inserted into Milvus through the insert interface, and the CSV file generated from this dataframe using df.to_csv(index=False) can be directly used as a bulk insert file.

@OxalisCu
Copy link
Contributor

https://github.com/milvus-io/pymilvus/blob/master/examples/example_bulkinsert_csv.py @OxalisCu This example lacks many data types and can be supplemented with all data types. At the same time, this example also lacks the usage of bulk writers, please add that as well.

OK, thank you for reporting this bug, I will fix it soon.

example_bulkinsert_csv.py does lack tests for all data types and bulk writers. The reason is that I overlooked this issue when writing tests based on example_bulkinsert_json.py.

I noticed that example_bulkwriter.py supports testing of bulk writers for various file formats. I would like to add tests for bulk writer csv in this file. Do you think it is okay?

@OxalisCu
Copy link
Contributor

def gen_csv_rowbased(num, path, partition_name, sep=","):
global id_start
header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "dynamic_field"]
rows = []
for i in range(num):
rows.append([
id_start, # id field
json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
"{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
id_start, # no field matches this value, this value will be put into dynamic field
])
id_start = id_start + 1
data = [header] + rows
with open(path, "w") as f:
writer = csv.writer(f, delimiter=sep)
for row in data:
writer.writerow(row)

@OxalisCu @yhmo

At the same time, is the usage of Dynamic SCHEMA correct in this example? I understand it should be written like this, right?

def gen_csv_rowbased(num, path, partition_name, sep=","):
    global id_start
    header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "$meta"]
    rows = []
    for i in range(num):
        rows.append([
            id_start, # id field
            json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
            json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
            "{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
             json.dumps({"dynameic_fiield_1": 1, "dynamic_field_2":2}), # no field matches this value, this value will be put into dynamic field
        ])
        id_start = id_start + 1
    data = [header] + rows
    with open(path, "w") as f:
        writer = csv.writer(f, delimiter=sep)
        for row in data:
            writer.writerow(row)

The code here is correct, and is for verifying a small function.

As long as the collection allows dynamic field, milvus will automatically put the fields (such as 'dynamic_field') that are not in the schema into the dynamic field ($meta) when parsing, so both of the codes above are supported. This part of the code is on line 102 of row_parser.go:

https://github.com/milvus-io/milvus/blob/7ff41697f9131fcc41107edc3770a6d28c1a0394/internal/util/importutilv2/csv/row_parser.go

@zhuwenxing
Copy link
Contributor Author

def gen_csv_rowbased(num, path, partition_name, sep=","):
global id_start
header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "dynamic_field"]
rows = []
for i in range(num):
rows.append([
id_start, # id field
json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
"{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
id_start, # no field matches this value, this value will be put into dynamic field
])
id_start = id_start + 1
data = [header] + rows
with open(path, "w") as f:
writer = csv.writer(f, delimiter=sep)
for row in data:
writer.writerow(row)

@OxalisCu @yhmo
At the same time, is the usage of Dynamic SCHEMA correct in this example? I understand it should be written like this, right?

def gen_csv_rowbased(num, path, partition_name, sep=","):
    global id_start
    header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "$meta"]
    rows = []
    for i in range(num):
        rows.append([
            id_start, # id field
            json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
            json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
            "{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
             json.dumps({"dynameic_fiield_1": 1, "dynamic_field_2":2}), # no field matches this value, this value will be put into dynamic field
        ])
        id_start = id_start + 1
    data = [header] + rows
    with open(path, "w") as f:
        writer = csv.writer(f, delimiter=sep)
        for row in data:
            writer.writerow(row)

The code here is correct, and is for verifying a small function.

As long as the collection allows dynamic field, milvus will automatically put the fields (such as 'dynamic_field') that are not in the schema into the dynamic field ($meta) when parsing, so both of the codes above are supported. This part of the code is on line 102 of row_parser.go:

https://github.com/milvus-io/milvus/blob/7ff41697f9131fcc41107edc3770a6d28c1a0394/internal/util/importutilv2/csv/row_parser.go

However, using this method, each row can only contain the same dynamic column (I understand this method to be more like a fixed column that is not defined in the schema), but in many cases, the dynamic column names of different rows are different, and this method should not handle this situation

@OxalisCu
Copy link
Contributor

def gen_csv_rowbased(num, path, partition_name, sep=","):
global id_start
header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "dynamic_field"]
rows = []
for i in range(num):
rows.append([
id_start, # id field
json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
"{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
id_start, # no field matches this value, this value will be put into dynamic field
])
id_start = id_start + 1
data = [header] + rows
with open(path, "w") as f:
writer = csv.writer(f, delimiter=sep)
for row in data:
writer.writerow(row)

@OxalisCu @yhmo
At the same time, is the usage of Dynamic SCHEMA correct in this example? I understand it should be written like this, right?

def gen_csv_rowbased(num, path, partition_name, sep=","):
    global id_start
    header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "$meta"]
    rows = []
    for i in range(num):
        rows.append([
            id_start, # id field
            json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
            json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
            "{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
             json.dumps({"dynameic_fiield_1": 1, "dynamic_field_2":2}), # no field matches this value, this value will be put into dynamic field
        ])
        id_start = id_start + 1
    data = [header] + rows
    with open(path, "w") as f:
        writer = csv.writer(f, delimiter=sep)
        for row in data:
            writer.writerow(row)

The code here is correct, and is for verifying a small function.
As long as the collection allows dynamic field, milvus will automatically put the fields (such as 'dynamic_field') that are not in the schema into the dynamic field ($meta) when parsing, so both of the codes above are supported. This part of the code is on line 102 of row_parser.go:
https://github.com/milvus-io/milvus/blob/7ff41697f9131fcc41107edc3770a6d28c1a0394/internal/util/importutilv2/csv/row_parser.go

However, using this method, each row can only contain the same dynamic column (I understand this method to be more like a fixed column that is not defined in the schema), but in many cases, the dynamic column names of different rows are different, and this method should not handle this situation

Indeed, the second way of writing is better. I will change it in example.csv

@CaoHaiNam
Copy link
Contributor

@OxalisCu
I see this pull request #2281 have already handled this issue, but when I run file: examples/example_bulkinsert_csv.py after pulling the lastest source code, this file did not execute successfully. This is message I deal with:

RPC error: [do_bulk_insert], <MilvusException: (code=2100, message=unexpect file type, files=[milvus_bulkinsert/csv_0/csv_0.csv]: importing data failed)>, <Time:{'RPC start': '2024-10-22 09:26:31.119571', 'RPC error': '2024-10-22 09:26:31.122147'}>
Traceback (most recent call last):
  File "/home/namch/pymilvus/master/pymilvus/examples/example_bulkinsert_csv.py", line 403, in <module>
    main(has_partition_key)
    ~~~~^^^^^^^^^^^^^^^^^^^
  File "/home/namch/pymilvus/master/pymilvus/examples/example_bulkinsert_csv.py", line 349, in main
    bulk_insert_rowbased(row_count_per_file=row_count_per_file, file_count=1)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/namch/pymilvus/master/pymilvus/examples/example_bulkinsert_csv.py", line 136, in bulk_insert_rowbased
    task_id = utility.do_bulk_insert(collection_name=_COLLECTION_NAME,
                                 partition_name=partition_name,
                                 files=remote_files,
                                 sep=sep)
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/orm/utility.py", line 841, in do_bulk_insert
    return _get_connection(using).do_bulk_insert(
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        collection_name, partition_name, files, timeout=timeout, **kwargs
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/decorators.py", line 141, in handler
    raise e from e
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/decorators.py", line 137, in handler
    return func(*args, **kwargs)
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/decorators.py", line 176, in handler
    return func(self, *args, **kwargs)
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/decorators.py", line 116, in handler
    raise e from e
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/decorators.py", line 86, in handler
    return func(*args, **kwargs)
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/client/grpc_handler.py", line 1698, in do_bulk_insert
    check_status(response.status)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/home/namch/pymilvus/dev/pymilvus/pymilvus/client/utils.py", line 63, in check_status
    raise MilvusException(status.code, status.reason, status.error_code)
pymilvus.exceptions.MilvusException: <MilvusException: (code=2100, message=unexpect file type, files=[milvus_bulkinsert/csv_0/csv_0.csv]: importing data failed)>

Have you seen this error before? I’d be happy to work on fixing it as my first contribution if it’s still unresolved.

Thanks!

@OxalisCu
Copy link
Contributor

OxalisCu commented Oct 22, 2024

I haven't reproduced this problem. The error seems to be that milvus does not support the csv format. Please check whether milvus is compiled from the latest code of the master branch. https://github.com/milvus-io/milvus

@CaoHaiNam
Copy link
Contributor

Why should I check Milvus again?
I’ve already run Milvus in standalone mode, and the service is working well. However, in the file mentioned https://github.com/milvus-io/pymilvus/blob/master/examples/example_bulkinsert_csv.py, milvus library doesn’t seem to be utilized.

I’m wondering if there’s an issue with bulk inserting CSV files or if Milvus currently doesn’t support bulk insert from CSV. Could you clarify this?

@OxalisCu
Copy link
Contributor

Milvus supported the csv format in August milvus-io/milvus#34938. The current released version does not support inserting csv files.

From the error message, I guess the reason is that you are not using the latest milvus code, so you can build milvus from the latest code and try again.

@XuanYang-cn XuanYang-cn added this to the PyMilvus 2.5.0 milestone Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants