Skip to content

Commit

Permalink
fix: Fix the f16 and bf16 dump error & add all type test for csv & cs…
Browse files Browse the repository at this point in the history
…v supports null (milvus-io#2281)

milvus-io#2276

---------

Signed-off-by: OxalisCu <[email protected]>
  • Loading branch information
OxalisCu authored Oct 21, 2024
1 parent c6e0326 commit 4f64af7
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 41 deletions.
6 changes: 3 additions & 3 deletions examples/example_bulkinsert_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ def create_partition(collection, partition_name):

def gen_csv_rowbased(num, path, partition_name, sep=","):
global id_start
header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, "dynamic_field"]
header = [_ID_FIELD_NAME, _JSON_FIELD_NAME, _VECTOR_FIELD_NAME, _VARCHAR_FIELD_NAME, _DYNAMIC_FIELD_NAME]
rows = []
for i in range(num):
rows.append([
id_start, # id field
json.dumps({"Number": id_start, "Name": "book_"+str(id_start)}), # json field
json.dumps([round(random.random(), 6) for _ in range(_DIM)]), # vector field
[round(random.random(), 6) for _ in range(_DIM)], # vector field
"{}_{}".format(partition_name, id_start) if partition_name is not None else "description_{}".format(id_start), # varchar field
id_start, # no field matches this value, this value will be put into dynamic field
json.dumps({"dynamic_field": id_start}), # no field matches this value, this value will be put into dynamic field
])
id_start = id_start + 1
data = [header] + rows
Expand Down
1 change: 1 addition & 0 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ def cloud_bulkinsert():
BulkFileType.JSON,
BulkFileType.NUMPY,
BulkFileType.PARQUET,
BulkFileType.CSV,
]

schema = build_simple_collection()
Expand Down
97 changes: 59 additions & 38 deletions pymilvus/bulk_writer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# or implied. See the License for the specific language governing permissions and limitations under
# the License.

import csv
import json
import logging
from pathlib import Path
Expand Down Expand Up @@ -290,49 +289,71 @@ def _persist_parquet(self, local_path: str, **kwargs):

def _persist_csv(self, local_path: str, **kwargs):
sep = self._config.get("sep", ",")
nullkey = self._config.get("nullkey", "")

header = list(self._buffer.keys())
data = []
data_count = len(next(iter(self._buffer.values())))
for i in range(data_count):
row = []
for name in header:
field_schema = self._fields[name]

# null is not supported yet
# convert to string
if field_schema.dtype in {
DataType.JSON,
DataType.ARRAY,
DataType.SPARSE_FLOAT_VECTOR,
DataType.BINARY_VECTOR,
DataType.FLOAT_VECTOR,
}:
row.append(json.dumps(self._buffer[name][i]))
elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
row.append(
json.dumps(
np.frombuffer(
self._buffer[name][i],
dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name],
)
)
)
elif field_schema.dtype in {DataType.BOOL}:
row.append("true" if self._buffer[name][i] else "false")
else:
row.append(str(self._buffer[name][i]))
data.append(row)

rows = [header, *data]
data = pd.DataFrame(columns=header)
for k, v in self._buffer.items():
field_schema = self._fields[k]
# When using df.to_csv(arr) to write non-scalar data,
# the repr function is used to convert the data to a string.
# if the value of arr is [1.0, 2.0], repr(arr) will change with the type of arr:
# when arr is a list, the output is '[1.0, 2.0]'
# when arr is a tuple, the output is '(1.0, 2.0)'
# when arr is a np.array, the output is '[1.0 2.0]'
# we needs the output to be '[1.0, 2.0]', consistent with the array format in json
# so 1. whether make sure that arr of type
# (BINARY_VECTOR, FLOAT_VECTOR, FLOAT16_VECTOR, BFLOAT16_VECTOR) is a LIST,
# 2. or convert arr into a string using json.dumps(arr) first and then add it to df
# I choose method 2 here
if field_schema.dtype in {
DataType.SPARSE_FLOAT_VECTOR,
DataType.BINARY_VECTOR,
DataType.FLOAT_VECTOR,
}:
arr = []
for val in v:
arr.append(json.dumps(val))
data[k] = pd.Series(arr, dtype=np.dtype("str"))
elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
# special process for float16 vector, the self._buffer stores bytes for
# float16 vector, convert the bytes to float list
dt = (
np.dtype("bfloat16")
if (field_schema.dtype == DataType.BFLOAT16_VECTOR)
else np.dtype("float16")
)
arr = []
for val in v:
arr.append(json.dumps(np.frombuffer(val, dtype=dt).tolist()))
data[k] = pd.Series(arr, dtype=np.dtype("str"))
elif field_schema.dtype in {
DataType.JSON,
DataType.ARRAY,
}:
arr = []
for val in v:
if val is None:
arr.append(nullkey)
else:
arr.append(json.dumps(val))
data[k] = pd.Series(arr, dtype=np.dtype("str"))
elif field_schema.dtype in {DataType.BOOL}:
arr = []
for val in v:
if val is not None:
arr.append("true" if val else "false")
data[k] = pd.Series(arr, dtype=np.dtype("str"))
else:
data[k] = pd.Series(v, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name])

file_path = Path(local_path + ".csv")
try:
with file_path.open("w", encoding="utf-8") as f:
writer = csv.writer(f, delimiter=sep)
writer.writerows(rows)
# pd.Series will convert None to np.nan,
# so we can use 'na_rep=nullkey' to replace NaN with nullkey
data.to_csv(file_path, sep=sep, na_rep=nullkey, index=False)
except Exception as e:
self._throw(f"Failed to persist file {file_path}, error: {e}")

logger.info(f"Successfully persist file {file_path}, row count: {len(rows)}")
logger.info("Successfully persist file %s, row count: %s", file_path, len(data))
return [str(file_path)]
14 changes: 14 additions & 0 deletions pymilvus/bulk_writer/bulk_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,20 @@ def _verify_row(self, row: dict):
self._throw(f"The field '{field.name}' is missed in the row")

dtype = DataType(field.dtype)

# deal with null (None)
if field.nullable and row[field.name] is None:
if (
field.default_value is not None
and field.default_value.WhichOneof("data") is not None
):
# set default value
data_type = field.default_value.WhichOneof("data")
row[field.name] = getattr(field.default_value, data_type)
else:
# skip field check if the field is null
continue

if dtype in {
DataType.BINARY_VECTOR,
DataType.FLOAT_VECTOR,
Expand Down

0 comments on commit 4f64af7

Please sign in to comment.