diff --git a/polytope_server/common/datasource/polytope.py b/polytope_server/common/datasource/polytope.py index 66cb197..aa67167 100644 --- a/polytope_server/common/datasource/polytope.py +++ b/polytope_server/common/datasource/polytope.py @@ -18,10 +18,10 @@ # does it submit to any jurisdiction. # +import copy import json import logging import os -import copy import yaml from polytope.utility.exceptions import PolytopeError @@ -61,7 +61,7 @@ def retrieve(self, request): # Set the "pre-path" for this request pre_path = {} - for k,v in r.items(): + for k, v in r.items(): if k in self.req_single_keys: if isinstance(v, list): v = v[0] @@ -70,14 +70,13 @@ def retrieve(self, request): polytope_mars_config = copy.deepcopy(self.config) polytope_mars_config["options"]["pre_path"] = pre_path - polytope_mars = PolytopeMars( polytope_mars_config, - log_context= { - "user": request.user.realm + ':' + request.user.username, + log_context={ + "user": request.user.realm + ":" + request.user.username, "id": request.id, - }) - + }, + ) try: self.output = polytope_mars.extract(r) @@ -111,7 +110,7 @@ def match(self, request): raise Exception("got {} : {}, but expected one of {}".format(k, r[k], v)) # Check that there is only one value if required - for k, v in r.items(): + for k, v in r.items(): if k in self.req_single_keys: v = [v] if isinstance(v, str) else v if len(v) > 1: diff --git a/polytope_server/common/staging/s3_boto3_staging.py b/polytope_server/common/staging/s3_boto3_staging.py index 59c483a..7cf43d8 100644 --- a/polytope_server/common/staging/s3_boto3_staging.py +++ b/polytope_server/common/staging/s3_boto3_staging.py @@ -20,7 +20,6 @@ import json import logging -import random import time from concurrent.futures import Future, ThreadPoolExecutor @@ -59,7 +58,6 @@ def submit(self, fn, /, *args, **kwargs): class S3Staging_boto3(staging.Staging): def __init__(self, config): - self.bucket = config.get("bucket", "default") self.url = config.get("url", None) @@ -76,17 +74,9 @@ def __init__(self, config): for name in ["boto", "urllib3", "s3transfer", "boto3", "botocore", "nose"]: logging.getLogger(name).setLevel(logging.WARNING) - prefix = "https" if self.use_ssl else "http" - - if config.get("random_host", False): - self.host = config.get("random_host", {}).get("host", self.host) - index = random.randint(0, config.get("random_host", {}).get("max", 1) - 1) - # replace %%ID%% in the host with the index - self.host = self.host.replace("%%ID%%", str(index)) - self.url = self.url + "/" + str(index) - logging.info(f"Using random host: {self.host}") + self.prefix = "https" if self.use_ssl else "http" - self._internal_url = f"{prefix}://{self.host}:{self.port}" + self._internal_url = f"http://{self.host}:{self.port}" # Setup Boto3 client self.s3_client = boto3.client( @@ -125,7 +115,10 @@ def create(self, name, data, content_type): # else using content-disposition header try: multipart_upload = self.s3_client.create_multipart_upload( - Bucket=self.bucket, Key=name, ContentType=content_type, ContentDisposition="attachment" + Bucket=self.bucket, + Key=name, + ContentType=content_type, + ContentDisposition="attachment", ) upload_id = multipart_upload["UploadId"] @@ -140,7 +133,15 @@ def create(self, name, data, content_type): else: for part_data in self.iterator_buffer(data, self.buffer_size): if part_data: - futures.append(executor.submit(self.upload_part, name, part_number, part_data, upload_id)) + futures.append( + executor.submit( + self.upload_part, + name, + part_number, + part_data, + upload_id, + ) + ) part_number += 1 for future in futures: @@ -153,7 +154,10 @@ def create(self, name, data, content_type): raise ValueError("No data retrieved") self.s3_client.complete_multipart_upload( - Bucket=self.bucket, Key=name, UploadId=upload_id, MultipartUpload={"Parts": parts} + Bucket=self.bucket, + Key=name, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, ) logging.info(f"Successfully uploaded {name} in {len(parts)} parts.") @@ -168,7 +172,11 @@ def create(self, name, data, content_type): def upload_part(self, name, part_number, data, upload_id): logging.debug(f"Uploading part {part_number} of {name}, {len(data)} bytes") response = self.s3_client.upload_part( - Bucket=self.bucket, Key=name, PartNumber=part_number, UploadId=upload_id, Body=data + Bucket=self.bucket, + Key=name, + PartNumber=part_number, + UploadId=upload_id, + Body=data, ) return {"PartNumber": part_number, "ETag": response["ETag"]} @@ -190,14 +198,14 @@ def set_bucket_policy(self): }, { "Sid": "AllowListBucket", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{self.bucket}", }, { "Sid": "AllowGetBucketLocation", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:GetBucketLocation", "Resource": f"arn:aws:s3:::{self.bucket}", @@ -239,7 +247,11 @@ def stat(self, name): def get_url(self, name): if self.url: - return f"{self.url}/{self.bucket}/{name}" + if self.url.startswith("http"): + # This covers both http and https + return f"{self.url}/{self.bucket}/{name}" + else: + return f"{self.prefix}://{self.url}/{self.bucket}/{name}" return None def get_internal_url(self, name): diff --git a/polytope_server/common/staging/s3_staging.py b/polytope_server/common/staging/s3_staging.py index 6f11e44..0976c4e 100644 --- a/polytope_server/common/staging/s3_staging.py +++ b/polytope_server/common/staging/s3_staging.py @@ -27,20 +27,33 @@ # ####################################################################### -import copy import json import logging import time +import warnings +from collections import namedtuple from concurrent.futures import Future, ThreadPoolExecutor -import minio from minio import Minio -from minio.definitions import UploadPart -from minio.error import BucketAlreadyExists, BucketAlreadyOwnedByYou, NoSuchKey +from minio.error import S3Error from ..metric_collector import S3StorageMetricCollector from . import staging +# Ensure that DeprecationWarnings are displayed +warnings.simplefilter("always", DeprecationWarning) + +warnings.warn( + f"The '{__name__}' module is deprecated and will be removed in a future version. " + "Please migrate to the new module 's3_boto3' to avoid disruption.", + DeprecationWarning, + stacklevel=1, +) + + +# Defining a named tuple to represent a part with part_number and etag +Part = namedtuple("Part", ["part_number", "etag"]) + class AvailableThreadPoolExecutor(ThreadPoolExecutor): @@ -74,99 +87,148 @@ def __init__(self, config): self.port = config.get("port", "8000") self.max_threads = config.get("max_threads", 20) self.buffer_size = config.get("buffer_size", 20 * 1024 * 1024) - endpoint = "{}:{}".format(self.host, self.port) access_key = config.get("access_key", "") secret_key = config.get("secret_key", "") self.bucket = config.get("bucket", "default") secure = config.get("secure", False) self.url = config.get("url", None) - internal_url = "{}:{}".format(self.host, self.port) - secure = config.get("use_ssl", False) + self.internal_url = f"http://{self.host}:{self.port}" + self.use_ssl = config.get("use_ssl", False) + self.should_set_policy = config.get("should_set_policy", False) + + # remove the protocol from the internal_url, both http and https can be removed + endpoint = self.internal_url.split("://")[-1] if access_key == "" or secret_key == "": self.client = Minio( - internal_url, + endpoint, secure=secure, ) else: self.client = Minio( - internal_url, + endpoint, access_key=access_key, secret_key=secret_key, secure=secure, ) - self.internal_url = ("https://" if secure else "http://") + internal_url + self.prefix = "https" if self.use_ssl else "http" try: self.client.make_bucket(self.bucket) - self.client.set_bucket_policy(self.bucket, self.bucket_policy()) - except BucketAlreadyExists: - pass - except BucketAlreadyOwnedByYou: - pass - - self.storage_metric_collector = S3StorageMetricCollector(endpoint, self.client, self.bucket, self.get_type()) + if self.should_set_policy: + self.client.set_bucket_policy(self.bucket, self.bucket_policy()) + except S3Error as err: + if err.code in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + pass + else: + raise + + self.storage_metric_collector = S3StorageMetricCollector( + self.internal_url, self.client, self.bucket, self.get_type() + ) logging.info( "Opened data staging at {}:{}/{}, locatable from {}".format(self.host, self.port, self.bucket, self.url) ) - def upload_part(self, part_number, buf, metadata, name, upload_id): - logging.debug(f"Uploading part {part_number} ({len(buf)} bytes) of {name}") - etag = self.client._do_put_object( - self.bucket, name, buf, len(buf), part_number=part_number, metadata=metadata, upload_id=upload_id - ) - return etag, len(buf) - def create(self, name, data, content_type): - url = self.get_url(name) - logging.info("Putting to staging: {}".format(name)) + name = name + ".grib" + try: + # Prepare headers for content type and content disposition + headers = { + "Content-Type": content_type, + "Content-Disposition": "attachment", + } + + # Initiate a multipart upload + upload_id = self.client._create_multipart_upload( + bucket_name=self.bucket, + object_name=name, + headers=headers, + ) - metadata = minio.helpers.amzprefix_user_metadata({}) - metadata["Content-Type"] = content_type + parts = [] + part_number = 1 + futures = [] + + with AvailableThreadPoolExecutor(max_workers=self.max_threads) as executor: + executor.wait_for_available_worker() + if not data: + logging.info(f"No data provided. Uploading a single empty part for {name}.") + # Upload an empty part + result = self.upload_part(name, part_number, b"", upload_id) + parts.append(result) + else: + # Ensure 'data' is an iterable of bytes objects + if isinstance(data, bytes): + data_iter = [data] # Wrap bytes object in a list to make it iterable + elif hasattr(data, "read"): + # If 'data' is a file-like object, read it in chunks + data_iter = iter(lambda: data.read(self.buffer_size), b"") + elif hasattr(data, "__iter__"): + data_iter = data # Assume it's already an iterable of bytes + else: + raise TypeError("data must be bytes, a file-like object, or an iterable over bytes") + + for part_data in self.iterator_buffer(data_iter, self.buffer_size): + if part_data: + futures.append( + executor.submit( + self.upload_part, + name, + part_number, + part_data, + upload_id, + ) + ) + part_number += 1 + + for future in futures: + result = future.result() + parts.append(result) + + if not parts: + logging.warning(f"No parts uploaded for {name}. Aborting upload.") + self.client._abort_multipart_upload(self.bucket, name, upload_id) + raise ValueError("No data retrieved") + + # Complete multipart upload + self.client._complete_multipart_upload( + bucket_name=self.bucket, + object_name=name, + upload_id=upload_id, + parts=parts, + ) - upload_id = self.client._new_multipart_upload(self.bucket, name, metadata) + logging.info(f"Successfully uploaded {name} in {len(parts)} parts.") + return self.get_url(name) - parts = {} - part_number = 1 - futures = [] + except S3Error as e: + logging.error(f"Failed to upload {name}: {e}") + if "upload_id" in locals(): + self.client._abort_multipart_upload(self.bucket, name, upload_id) + raise - with AvailableThreadPoolExecutor(max_workers=self.max_threads) as executor: - executor.wait_for_available_worker() - for buf in self.iterator_buffer(data, self.buffer_size): - if len(buf) == 0: - break - future = executor.submit(self.upload_part, copy.copy(part_number), buf, metadata, name, upload_id) - futures.append((future, part_number)) - part_number += 1 + def upload_part(self, name, part_number, data, upload_id): + logging.debug(f"Uploading part {part_number} of {name}, {len(data)} bytes") - try: - for future, part_number in futures: - etag, size = future.result() - parts[part_number] = UploadPart(self.bucket, name, upload_id, part_number, etag, None, size) - except Exception as e: - logging.error(f"Error uploading parts: {str(e)}") - self.client._remove_incomplete_upload(self.bucket, name, upload_id) - raise + # 'data' is expected to be a bytes object + if not isinstance(data, bytes): + raise TypeError(f"'data' must be bytes, got {type(data)}") - # Completing upload - try: - logging.info(parts) - try: - self.client._complete_multipart_upload(self.bucket, name, upload_id, parts) - except Exception: - time.sleep(5) - self.client._complete_multipart_upload(self.bucket, name, upload_id, parts) - - except Exception as e: - logging.error(f"Error completing multipart upload: {str(e)}") - self.client._remove_incomplete_upload(self.bucket, name, upload_id) - raise + response = self.client._upload_part( + bucket_name=self.bucket, + object_name=name, + data=data, + headers=None, + upload_id=upload_id, + part_number=part_number, + ) + etag = response.replace('"', "") # Remove any quotes from the ETag - logging.info("Put to {}".format(url)) - return url + return Part(part_number=part_number, etag=etag) def read(self, name): try: @@ -174,8 +236,11 @@ def read(self, name): if response.status == 200: return response.data logging.error("Could not read object {}, returned with status: {}".format(name, response.status)) - except NoSuchKey: - raise KeyError() + except S3Error as err: + if err.code == "NoSuchKey": + raise KeyError() + else: + raise def delete(self, name): if not self.query(name): @@ -188,15 +253,19 @@ def query(self, name): try: self.client.stat_object(self.bucket, name) return True - except NoSuchKey: - return False + except S3Error as err: + if err.code == "NoSuchKey": + return def stat(self, name): try: obj = self.client.stat_object(self.bucket, name) return obj.content_type, obj.size - except NoSuchKey: - raise KeyError() + except S3Error as err: + if err.code == "NoSuchKey": + raise KeyError() + else: + raise def list(self): resources = [] @@ -213,10 +282,13 @@ def collect_metric_info(self): return self.storage_metric_collector.collect().serialize() def get_url(self, name): - if self.url is None: - return None - url = "{}/{}/{}".format(self.url, self.bucket, name) - return url + if self.url: + if self.url.startswith("http"): + # This covers both http and https + return f"{self.url}/{self.bucket}/{name}" + else: + return f"{self.prefix}://{self.url}/{self.bucket}/{name}" + return None def get_internal_url(self, name): url = "{}/{}/{}".format(self.internal_url, self.bucket, name) @@ -246,14 +318,14 @@ def bucket_policy(self): }, { "Sid": "AllowListBucket", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{self.bucket}", }, { "Sid": "AllowGetBucketLocation", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:GetBucketLocation", "Resource": f"arn:aws:s3:::{self.bucket}",