Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for pfr<=0.9.x #170

Merged
merged 5 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/dataflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ jobs:
matrix:
python-version: ["3.9", "3.10", "3.11"]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
# save some cycles and infer it might
# work on all versions between lowest and highest
# "pangeo-forge-recipes==0.10.0",
# "pangeo-forge-recipes==0.10.3",
"pangeo-forge-recipes==0.10.4",
]
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/flink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ jobs:
matrix:
python-version: [ "3.9", "3.10", "3.11" ]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
# save some cycles and infer it might
# work on all versions between lowest and highest
# "pangeo-forge-recipes==0.10.0",
# "pangeo-forge-recipes==0.10.3",
"pangeo-forge-recipes==0.10.4",
]
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ jobs:
matrix:
python-version: ["3.9", "3.10", "3.11"]
recipes-version: [
"pangeo-forge-recipes==0.9.4",
"pangeo-forge-recipes==0.10.0",
# save some cycles and infer it might
# work on all versions between lowest and highest
# "pangeo-forge-recipes==0.10.0",
# "pangeo-forge-recipes==0.10.3",
"pangeo-forge-recipes==0.10.4",
]
Expand Down
1 change: 0 additions & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
include LICENSE
include pangeo_forge_runner/commands/pangeo-forge-recipes-0.9-requirements.txt
14 changes: 14 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
coverage:
precision: 2
round: down
status:
project:
default:
target: 95
informational: true
patch: off
changes: off
ignore:
- "setup.py"
- "tests/*"
- "**/__init__.py"
6 changes: 0 additions & 6 deletions docs/reference/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,3 @@ where `pangeo-forge-runner` puts intermediate & final data products.
```{eval-rst}
.. autoconfigurable:: pangeo_forge_runner.storage.InputCacheStorage
```

## MetadataCacheStorage

```{eval-rst}
.. autoconfigurable:: pangeo_forge_runner.storage.MetadataCacheStorage
```
66 changes: 12 additions & 54 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@
from pathlib import Path

import escapism
from apache_beam import Pipeline, PTransform
from apache_beam import Pipeline
from traitlets import Bool, Type, Unicode, validate

from .. import Feedstock
from ..bakery.base import Bakery
from ..bakery.flink import FlinkOperatorBakery
from ..bakery.local import LocalDirectBakery
from ..plugin import get_injections, get_injectionspecs_from_entrypoints
from ..storage import InputCacheStorage, MetadataCacheStorage, TargetStorage
from ..storage import InputCacheStorage, TargetStorage
from ..stream_capture import redirect_stderr, redirect_stdout
from .base import BaseCommand, common_aliases, common_flags

PFR_0_9_REQUIREMENTS_FILE_PATH = (
Path(__file__).parent / "pangeo-forge-recipes-0.9-requirements.txt"
)


class Bake(BaseCommand):
"""
Expand Down Expand Up @@ -183,18 +179,13 @@ def start(self):
# with appropriate config from config file / commandline / defaults.
target_storage = TargetStorage(parent=self)
input_cache_storage = InputCacheStorage(parent=self)
metadata_cache_storage = MetadataCacheStorage(parent=self)

self.log.info(
f"Target Storage is {target_storage}\n", extra={"status": "setup"}
)
self.log.info(
f"Input Cache Storage is {input_cache_storage}\n", extra={"status": "setup"}
)
self.log.info(
f"Metadata Cache Storage is {metadata_cache_storage}\n",
extra={"status": "setup"},
)

injection_specs = get_injectionspecs_from_entrypoints()

Expand Down Expand Up @@ -234,17 +225,17 @@ def start(self):
self.log.info(f"Baking only recipe_id='{self.recipe_id}'")
recipes = {k: r for k, r in recipes.items() if k == self.recipe_id}

if self.prune:
# Prune recipes to only run on certain items if we are asked to
if hasattr(next(iter(recipes.values())), "copy_pruned"):
# pangeo-forge-recipes version < 0.10 has a `copy_pruned` method
recipes = {k: r.copy_pruned() for k, r in recipes.items()}

bakery: Bakery = self.bakery_class(parent=self)

extra_options = {}

for name, recipe in recipes.items():
if hasattr(recipe, "to_beam"):
# Catch recipes following pre-0.10 conventions and throw
raise ValueError(
"Unsupported recipe: please update to support pfr >=0.10 conventions."
)

if len(recipes) > 1:
recipe_name_hash = hashlib.sha256(name.encode()).hexdigest()[:5]
per_recipe_unique_job_name = (
Expand All @@ -259,16 +250,9 @@ def start(self):
else:
per_recipe_unique_job_name = None

# if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt
# file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes
if isinstance(recipe, PTransform):
requirements_path = feedstock.feedstock_dir / "requirements.txt"
if requirements_path.exists():
extra_options["requirements_file"] = str(requirements_path)
else:
extra_options["requirements_file"] = str(
PFR_0_9_REQUIREMENTS_FILE_PATH
)
requirements_path = feedstock.feedstock_dir / "requirements.txt"
if requirements_path.exists():
extra_options["requirements_file"] = str(requirements_path)

pipeline_options = bakery.get_pipeline_options(
job_name=(per_recipe_unique_job_name or self.job_name),
Expand All @@ -283,33 +267,7 @@ def start(self):
# Chain our recipe to the pipeline. This mutates the `pipeline` object!
# We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam'
# method that returns a transform.
if isinstance(recipe, PTransform):
# This means we are in pangeo-forge-recipes >=0.9
pipeline | recipe
elif hasattr(recipe, "to_beam"):
# We are in pangeo-forge-recipes <=0.9
# The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9
# NOTE: `StorageConfig` only requires a target; input and metadata caches are optional,
# so those are handled conditionally if provided.
from pangeo_forge_recipes.storage import StorageConfig

recipe.storage_config = StorageConfig(
target_storage.get_forge_target(job_name=self.job_name),
)
for attrname, optional_storage in zip(
("cache", "metadata"),
(input_cache_storage, metadata_cache_storage),
):
if not optional_storage.is_default():
setattr(
recipe.storage_config,
attrname,
optional_storage.get_forge_target(
job_name=self.job_name
),
)
# with configured storage now attached, compile recipe to beam
pipeline | recipe.to_beam()
pipeline | recipe

# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
Expand Down

This file was deleted.

8 changes: 0 additions & 8 deletions pangeo_forge_runner/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,3 @@ class InputCacheStorage(StorageTargetConfig):
"""

pangeo_forge_target_class = "CacheFSSpecTarget"


class MetadataCacheStorage(StorageTargetConfig):
"""
Storage configuration for caching metadata during recipe baking
"""

pangeo_forge_target_class = "MetadataTarget"
12 changes: 0 additions & 12 deletions tests/integration/flink/test_flink_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
from importlib.metadata import version

import pytest
import xarray as xr
from packaging.version import parse as parse_version

Expand All @@ -19,12 +18,6 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion):
pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = str(pfr_version)
else:
recipe_version_ref = "0.9.x"
pytest.xfail(
f"{pfr_version = }, which is < 0.10. "
"Flink tests timeout with this recipes version, so we xfail this test."
)

bucket = "s3://gpcp-out"
config = {
Expand All @@ -47,11 +40,6 @@ def test_flink_bake(minio_service, flinkversion, pythonversion, beamversion):
"fsspec_args": fsspec_args,
"root_path": bucket + "/input-cache/{job_name}",
},
"MetadataCacheStorage": {
"fsspec_class": "s3fs.S3FileSystem",
"fsspec_args": fsspec_args,
"root_path": bucket + "/metadata-cache/{job_name}",
},
"FlinkOperatorBakery": {
"flink_version": flinkversion,
"job_manager_resources": {"memory": "1024m", "cpu": 0.30},
Expand Down
8 changes: 3 additions & 5 deletions tests/integration/test_dataflow_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def test_dataflow_integration():
if pfr_version >= parse_version("0.10"):
recipe_version_ref = str(pfr_version)
else:
recipe_version_ref = "0.9.x"
raise ValueError(
f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer."
)
bucket = "gs://pangeo-forge-runner-ci-testing"
config = {
"Bake": {
Expand All @@ -30,10 +32,6 @@ def test_dataflow_integration():
"fsspec_class": "gcsfs.GCSFileSystem",
"root_path": bucket + "/input-cache/{job_name}",
},
"MetadataCacheStorage": {
"fsspec_class": "gcsfs.GCSFileSystem",
"root_path": bucket + "/metadata-cache/{job_name}",
},
}

with tempfile.NamedTemporaryFile("w", suffix=".json") as f:
Expand Down
12 changes: 3 additions & 9 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ def recipes_version_ref(request):
if pfr_version >= parse_version("0.10"):
recipes_version_ref = "0.10.x"
else:
recipes_version_ref = "0.9.x"
raise ValueError(
f"Unsupported pfr_version: {pfr_version}. Please upgrade to 0.10 or newer."
)
return (
recipes_version_ref
if not request.param == "dict_object"
Expand Down Expand Up @@ -168,11 +170,6 @@ def test_gpcp_bake(
"fsspec_args": fsspec_args,
"root_path": "s3://gpcp/input-cache/",
},
"MetadataCacheStorage": {
"fsspec_class": "s3fs.S3FileSystem",
"fsspec_args": fsspec_args,
"root_path": "s3://gpcp/metadata-cache/",
},
}

if no_input_cache:
Expand Down Expand Up @@ -208,9 +205,6 @@ def test_gpcp_bake(
if expected_error:
assert proc.returncode == 1
stdout[-1] == expected_error
elif no_input_cache and recipes_version_ref == "0.9.x":
# no_input_cache is only supported in 0.10.x and above
assert proc.returncode == 1
else:
assert proc.returncode == 0

Expand Down
Loading