You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, the upload function in the RemoteBulkWriter class isn't yet equipped with a retry mechanism, leading to [almost silent] upload failure if the remote storage system refuses to accept the request.
It would be good to equipe this function with a simple backup mechanism, such as:
# Upload function with retry
def _upload(self, file_path: str, object_name: str, max_retries: int = 5):
logger.info(f"Prepare to upload '{file_path}' to '{object_name}'")
retry_count = 0
while retry_count <= max_retries:
try:
# Check if Minio client
if isinstance(self._client, Minio):
logger.info(f"Target bucket: '{self._connect_param._bucket_name}'")
self._client.fput_object(
bucket_name=self._connect_param._bucket_name,
object_name=object_name,
file_path=file_path,
)
# Check if BlobServiceClient for Azure
elif isinstance(self._client, BlobServiceClient):
logger.info(f"Target bucket: '{self._connect_param._container_name}'")
container_client = self._client.get_container_client(
self._connect_param._container_name
)
with Path(file_path).open("rb") as data:
container_client.upload_blob(
name=object_name,
data=data,
overwrite=True,
max_concurrency=self._connect_param._upload_concurrency,
connection_timeout=600,
)
else:
raise MilvusException(message="Blob storage client is not initialized")
# If upload successful, log and exit
logger.info(f"Upload file '{file_path}' to '{object_name}' succeeded")
break
except S3Error as e:
if isinstance(self._client, Minio) and retry_count < max_retries:
wait_time = 2 ** retry_count # Exponential backoff
logger.warning(f"Vinhn---SlowDown error. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
retry_count += 1
else:
logger.error(f"Failed to upload '{file_path}' to '{object_name}': {e}")
raise
except Exception as e:
logger.error(f"nexpected error while uploading '{file_path}' to '{object_name}': {e}")
raise
Is there an existing issue for this?
What would you like to be added?
Currently, the upload function in the RemoteBulkWriter class isn't yet equipped with a retry mechanism, leading to [almost silent] upload failure if the remote storage system refuses to accept the request.
It would be good to equipe this function with a simple backup mechanism, such as:
Why is this needed?
More robust file ingestion mechanism.
Anything else?
pymilvus/pymilvus/bulk_writer/remote_bulk_writer.py
Line 275 in 3110139
The text was updated successfully, but these errors were encountered: