diff --git a/.github/workflows/dataflow.yaml b/.github/workflows/dataflow.yaml index 53dabe0f..e682bf41 100644 --- a/.github/workflows/dataflow.yaml +++ b/.github/workflows/dataflow.yaml @@ -29,10 +29,9 @@ jobs: matrix: python-version: ["3.9", "3.10", "3.11"] recipes-version: [ - "pangeo-forge-recipes==0.9.4", + "pangeo-forge-recipes==0.10.0", # save some cycles and infer it might # work on all versions between lowest and highest - # "pangeo-forge-recipes==0.10.0", # "pangeo-forge-recipes==0.10.3", "pangeo-forge-recipes==0.10.4", ] diff --git a/.github/workflows/flink.yaml b/.github/workflows/flink.yaml index 070312f4..49297172 100644 --- a/.github/workflows/flink.yaml +++ b/.github/workflows/flink.yaml @@ -29,10 +29,9 @@ jobs: matrix: python-version: [ "3.9", "3.10", "3.11" ] recipes-version: [ - "pangeo-forge-recipes==0.9.4", + "pangeo-forge-recipes==0.10.0", # save some cycles and infer it might # work on all versions between lowest and highest - # "pangeo-forge-recipes==0.10.0", # "pangeo-forge-recipes==0.10.3", "pangeo-forge-recipes==0.10.4", ] diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 3d1a02b2..cf509e9f 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -15,10 +15,9 @@ jobs: matrix: python-version: ["3.9", "3.10", "3.11"] recipes-version: [ - "pangeo-forge-recipes==0.9.4", + "pangeo-forge-recipes==0.10.0", # save some cycles and infer it might # work on all versions between lowest and highest - # "pangeo-forge-recipes==0.10.0", # "pangeo-forge-recipes==0.10.3", "pangeo-forge-recipes==0.10.4", ] diff --git a/MANIFEST.in b/MANIFEST.in index b797704c..1aba38f6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1 @@ include LICENSE -include pangeo_forge_runner/commands/pangeo-forge-recipes-0.9-requirements.txt diff --git a/codecov.yml b/codecov.yml index e69de29b..025cd90e 100644 --- a/codecov.yml +++ b/codecov.yml @@ -0,0 +1,14 @@ +coverage: + precision: 2 + round: down + status: + project: + default: + target: 95 + informational: true + patch: off + changes: off +ignore: + - "setup.py" + - "tests/*" + - "**/__init__.py" diff --git a/docs/reference/storage.md b/docs/reference/storage.md index 7f043fc5..2ebd3e86 100644 --- a/docs/reference/storage.md +++ b/docs/reference/storage.md @@ -14,9 +14,3 @@ where `pangeo-forge-runner` puts intermediate & final data products. ```{eval-rst} .. autoconfigurable:: pangeo_forge_runner.storage.InputCacheStorage ``` - -## MetadataCacheStorage - -```{eval-rst} -.. autoconfigurable:: pangeo_forge_runner.storage.MetadataCacheStorage -``` diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index 3fa91d92..942a2b2a 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -11,7 +11,7 @@ from pathlib import Path import escapism -from apache_beam import Pipeline, PTransform +from apache_beam import Pipeline from traitlets import Bool, Type, Unicode, validate from .. import Feedstock @@ -19,14 +19,10 @@ from ..bakery.flink import FlinkOperatorBakery from ..bakery.local import LocalDirectBakery from ..plugin import get_injections, get_injectionspecs_from_entrypoints -from ..storage import InputCacheStorage, MetadataCacheStorage, TargetStorage +from ..storage import InputCacheStorage, TargetStorage from ..stream_capture import redirect_stderr, redirect_stdout from .base import BaseCommand, common_aliases, common_flags -PFR_0_9_REQUIREMENTS_FILE_PATH = ( - Path(__file__).parent / "pangeo-forge-recipes-0.9-requirements.txt" -) - class Bake(BaseCommand): """ @@ -184,7 +180,6 @@ def start(self): # with appropriate config from config file / commandline / defaults. target_storage = TargetStorage(parent=self) input_cache_storage = InputCacheStorage(parent=self) - metadata_cache_storage = MetadataCacheStorage(parent=self) self.log.info( f"Target Storage is {target_storage}\n", extra={"status": "setup"} @@ -192,10 +187,6 @@ def start(self): self.log.info( f"Input Cache Storage is {input_cache_storage}\n", extra={"status": "setup"} ) - self.log.info( - f"Metadata Cache Storage is {metadata_cache_storage}\n", - extra={"status": "setup"}, - ) injection_specs = get_injectionspecs_from_entrypoints() @@ -236,17 +227,17 @@ def start(self): self.log.info(f"Baking only recipe_id='{self.recipe_id}'") recipes = {k: r for k, r in recipes.items() if k == self.recipe_id} - if self.prune: - # Prune recipes to only run on certain items if we are asked to - if hasattr(next(iter(recipes.values())), "copy_pruned"): - # pangeo-forge-recipes version < 0.10 has a `copy_pruned` method - recipes = {k: r.copy_pruned() for k, r in recipes.items()} - bakery: Bakery = self.bakery_class(parent=self) extra_options = {} for name, recipe in recipes.items(): + if hasattr(recipe, "to_beam"): + # Catch recipes following pre-0.10 conventions and throw + raise ValueError( + "Unsupported recipe: please update to support pfr >=0.10 conventions." + ) + if len(recipes) > 1: recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5] per_recipe_unique_job_name = ( @@ -261,16 +252,9 @@ def start(self): else: per_recipe_unique_job_name = None - # if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt - # file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes - if isinstance(recipe, PTransform): - requirements_path = feedstock.feedstock_dir / "requirements.txt" - if requirements_path.exists(): - extra_options["requirements_file"] = str(requirements_path) - else: - extra_options["requirements_file"] = str( - PFR_0_9_REQUIREMENTS_FILE_PATH - ) + requirements_path = feedstock.feedstock_dir / "requirements.txt" + if requirements_path.exists(): + extra_options["requirements_file"] = str(requirements_path) pipeline_options = bakery.get_pipeline_options( job_name=(per_recipe_unique_job_name or self.job_name), @@ -285,33 +269,7 @@ def start(self): # Chain our recipe to the pipeline. This mutates the `pipeline` object! # We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam' # method that returns a transform. - if isinstance(recipe, PTransform): - # This means we are in pangeo-forge-recipes >=0.9 - pipeline | recipe - elif hasattr(recipe, "to_beam"): - # We are in pangeo-forge-recipes <=0.9 - # The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9 - # NOTE: `StorageConfig` only requires a target; input and metadata caches are optional, - # so those are handled conditionally if provided. - from pangeo_forge_recipes.storage import StorageConfig - - recipe.storage_config = StorageConfig( - target_storage.get_forge_target(job_name=self.job_name), - ) - for attrname, optional_storage in zip( - ("cache", "metadata"), - (input_cache_storage, metadata_cache_storage), - ): - if not optional_storage.is_default(): - setattr( - recipe.storage_config, - attrname, - optional_storage.get_forge_target( - job_name=self.job_name - ), - ) - # with configured storage now attached, compile recipe to beam - pipeline | recipe.to_beam() + pipeline | recipe # Some bakeries are blocking - if Beam is configured to use them, calling # pipeline.run() blocks. Some are not. We handle that here, and provide diff --git a/pangeo_forge_runner/commands/pangeo-forge-recipes-0.9-requirements.txt b/pangeo_forge_runner/commands/pangeo-forge-recipes-0.9-requirements.txt deleted file mode 100644 index a281a6e1..00000000 --- a/pangeo_forge_runner/commands/pangeo-forge-recipes-0.9-requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -# List of packages installed in the container image when -# baking a recipe that's using pangeo-forge-recipes<0.10 -gcsfs -pangeo-forge-recipes<0.10 -s3fs diff --git a/pangeo_forge_runner/storage.py b/pangeo_forge_runner/storage.py index c3d19f03..9851b75f 100644 --- a/pangeo_forge_runner/storage.py +++ b/pangeo_forge_runner/storage.py @@ -105,11 +105,3 @@ class InputCacheStorage(StorageTargetConfig): """ pangeo_forge_target_class = "CacheFSSpecTarget" - - -class MetadataCacheStorage(StorageTargetConfig): - """ - Storage configuration for caching metadata during recipe baking - """ - - pangeo_forge_target_class = "MetadataTarget" diff --git a/tests/integration/flink/test_flink_integration.py b/tests/integration/flink/test_flink_integration.py index 84e13799..6f1d9260 100644 --- a/tests/integration/flink/test_flink_integration.py +++ b/tests/integration/flink/test_flink_integration.py @@ -4,7 +4,6 @@ import time from importlib.metadata import version -import pytest import xarray as xr from packaging.version import parse as parse_version @@ -19,12 +18,6 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion): pfr_version = parse_version(version("pangeo-forge-recipes")) if pfr_version >= parse_version("0.10"): recipe_version_ref = str(pfr_version) - else: - recipe_version_ref = "0.9.x" - pytest.xfail( - f"{pfr_version = }, which is < 0.10. " - "Flink tests timeout with this recipes version, so we xfail this test." - ) bucket = "s3://gpcp-out" config = { @@ -47,11 +40,6 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion): "fsspec_args": fsspec_args, "root_path": bucket + "/input-cache/{job_name}", }, - "MetadataCacheStorage": { - "fsspec_class": "s3fs.S3FileSystem", - "fsspec_args": fsspec_args, - "root_path": bucket + "/metadata-cache/{job_name}", - }, "FlinkOperatorBakery": { "flink_version": flinkversion, "job_manager_resources": {"memory": "1024m", "cpu": 0.30}, diff --git a/tests/integration/test_dataflow_integration.py b/tests/integration/test_dataflow_integration.py index d1797e20..2e56c285 100644 --- a/tests/integration/test_dataflow_integration.py +++ b/tests/integration/test_dataflow_integration.py @@ -14,7 +14,9 @@ def test_dataflow_integration(): if pfr_version >= parse_version("0.10"): recipe_version_ref = str(pfr_version) else: - recipe_version_ref = "0.9.x" + raise ValueError( + f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer." + ) bucket = "gs://pangeo-forge-runner-ci-testing" config = { "Bake": { @@ -30,10 +32,6 @@ def test_dataflow_integration(): "fsspec_class": "gcsfs.GCSFileSystem", "root_path": bucket + "/input-cache/{job_name}", }, - "MetadataCacheStorage": { - "fsspec_class": "gcsfs.GCSFileSystem", - "root_path": bucket + "/metadata-cache/{job_name}", - }, } with tempfile.NamedTemporaryFile("w", suffix=".json") as f: diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 5281b908..f10fcb41 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -108,7 +108,9 @@ def recipes_version_ref(request): if pfr_version >= parse_version("0.10"): recipes_version_ref = "0.10.x" else: - recipes_version_ref = "0.9.x" + raise ValueError( + f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer." + ) return ( recipes_version_ref if not request.param == "dict_object" @@ -168,11 +170,6 @@ def test_gpcp_bake( "fsspec_args": fsspec_args, "root_path": "s3://gpcp/input-cache/", }, - "MetadataCacheStorage": { - "fsspec_class": "s3fs.S3FileSystem", - "fsspec_args": fsspec_args, - "root_path": "s3://gpcp/metadata-cache/", - }, } if no_input_cache: @@ -208,9 +205,6 @@ def test_gpcp_bake( if expected_error: assert proc.returncode == 1 stdout[-1] == expected_error - elif no_input_cache and recipes_version_ref == "0.9.x": - # no_input_cache is only supported in 0.10.x and above - assert proc.returncode == 1 else: assert proc.returncode == 0