Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement]: Upload retry mechanism in RemoteBulkWriter class #2341

Open
1 task done
vinhngx opened this issue Nov 11, 2024 · 0 comments
Open
1 task done

[Enhancement]: Upload retry mechanism in RemoteBulkWriter class #2341

vinhngx opened this issue Nov 11, 2024 · 0 comments
Labels
kind/enhancement New feature or request

Comments

@vinhngx
Copy link

vinhngx commented Nov 11, 2024

Is there an existing issue for this?

  • I have searched the existing issues

What would you like to be added?

Currently, the upload function in the RemoteBulkWriter class isn't yet equipped with a retry mechanism, leading to [almost silent] upload failure if the remote storage system refuses to accept the request.

It would be good to equipe this function with a simple backup mechanism, such as:

# Upload function with retry
def _upload(self, file_path: str, object_name: str, max_retries: int = 5):
        logger.info(f"Prepare to upload '{file_path}' to '{object_name}'")

        retry_count = 0
        while retry_count <= max_retries:
            try:
                # Check if Minio client
                if isinstance(self._client, Minio):
                    logger.info(f"Target bucket: '{self._connect_param._bucket_name}'")
                    self._client.fput_object(
                        bucket_name=self._connect_param._bucket_name,
                        object_name=object_name,
                        file_path=file_path,
                    )

                # Check if BlobServiceClient for Azure
                elif isinstance(self._client, BlobServiceClient):
                    logger.info(f"Target bucket: '{self._connect_param._container_name}'")
                    container_client = self._client.get_container_client(
                        self._connect_param._container_name
                    )
                    with Path(file_path).open("rb") as data:
                        container_client.upload_blob(
                            name=object_name,
                            data=data,
                            overwrite=True,
                            max_concurrency=self._connect_param._upload_concurrency,
                            connection_timeout=600,
                        )
                
                else:
                    raise MilvusException(message="Blob storage client is not initialized")

                # If upload successful, log and exit
                logger.info(f"Upload file '{file_path}' to '{object_name}' succeeded")
                break

            except S3Error as e:
                if isinstance(self._client, Minio) and retry_count < max_retries:
                    wait_time = 2 ** retry_count  # Exponential backoff
                    logger.warning(f"Vinhn---SlowDown error. Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                    retry_count += 1
                else:
                    logger.error(f"Failed to upload '{file_path}' to '{object_name}': {e}")
                    raise

            except Exception as e:
                logger.error(f"nexpected error while uploading '{file_path}' to '{object_name}': {e}")
                raise

Why is this needed?

More robust file ingestion mechanism.

Anything else?

def _upload(self, file_list: list):

@vinhngx vinhngx added the kind/enhancement New feature or request label Nov 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants