Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable lazy pipeline mutations via recipe functions #168

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

moradology
Copy link
Contributor

@moradology moradology commented Feb 9, 2024

Recently (while trying to figure out why it is so hard to call to_runner_api() for DAG serialization) did a deep dive into how beam handles its pipelines and transformations. It is horrifying. So: every time you pipe to a transform what's actually happening is your Pipeline instance is internally mutating the state of its job-graph. The frustrating thing is that our current strategy tries to take a PTransform and apply it to a pipeline that's created later. This does not work as expected. Not at all.

Here's something you can do in beam:

pipeline = beam.Pipeline()
recipe = pipeline | beam.Create([1,2,3])
recipe | "BRANCH1" >> beam.Map(lambda x: x)
recipe | "BRANCH2" >> beam.FlatMap(lambda x: [x])

Unfortunately, this (which is currently how things work around here) does not work:

recipe = beam.Create([1,2,3])
recipe | "BRANCH1" >> beam.Map(lambda x: x)
recipe | "BRANCH2" >> beam.FlatMap(lambda x: [x])
pipeline = beam.Pipeline()
pipeline | recipe

To make matters worse: those "unique names" (e.g "BRANCH1") are just lost. They're gone. The PTransform doesn't hang onto them. The Pipeline does. Through mutations. This is very confusing.

This branch fixes that. Recipes can now be functions that take a pipeline and apply mutations. Mutations are still bad, but now we can use them as intended.

def recipe(p: beam.Pipeline) -> None:
    init = p | beam.Create([1,2,3])
    init | "BRANCH1" >> beam.Map(lambda x: x)
    init | "BRANCH2" >> beam.FlatMap(lambda x: [x])

So: now we have unique names and we can do branching pipelines. Probably other things too!

Copy link

codecov bot commented Feb 9, 2024

Codecov Report

Attention: 1 lines in your changes are missing coverage. Please review.

Comparison is base (9667ae6) 95.53% compared to head (98792bd) 95.35%.

Files Patch % Lines
pangeo_forge_runner/commands/bake.py 66.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #168      +/-   ##
==========================================
- Coverage   95.53%   95.35%   -0.19%     
==========================================
  Files          14       14              
  Lines         493      495       +2     
==========================================
+ Hits          471      472       +1     
- Misses         22       23       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -261,7 +262,7 @@ def start(self):

# if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt
# file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes
if isinstance(recipe, PTransform):
if isinstance(recipe, PTransform) or inspect.isfunction(recipe):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use callable instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems totally doable - I wasn't actually familiar with inspect.isfunction so perhaps there are implications here that I'm unfamiliar with?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, callable will appropriately check for anything with the callable protocol as well as callable classes while inspect.isfunction will explicitly check just for functions. Since the goal here is to just call the recipe (rather than do something specific to functions, like look at their source), I think callable is the right thing here.

@yuvipanda
Copy link
Collaborator

PRs that discover things that are described with:

It is horrifying

are my favorite kind of PRs! :)

@ranchodeluxe
Copy link
Collaborator

ranchodeluxe commented Feb 12, 2024

Some feedback about this that we need to look into :

  1. The job we kicked off on Thursday and Friday seems to have finished (meaning the output was there for zarr and pyramids) but the job never completed once inputs were exhausted -- is it possible this is producing a strange DAG?

  2. Also strange: I'm noticing that it STILL is not resolving pip requirements correctly and I end up with 9.x installed (this was after modifying the recipe so it could be related to that)

  3. We should definitely figure out how to incorporate this change if branching is gonna be useful but I guess I'm not convinced that blindly using the same inputs for two different types of jobs is wise? For example, I imagine StoreToZarr and StoreToPyramid and WriteCombinedReference jobs each have unique considerations for how they will run in a distributed manner (e.g. how we'll group inputs and route to specific workers might change between jobs). So why not just have them be separate jobs to begin with? Thoughts?

@moradology
Copy link
Contributor Author

moradology commented Feb 12, 2024

  1. I doubt it is a strange dag at least insofar as it was the work here which made it strange. Good news is that we can surely produce some logging with to_runner_api that will explain the desired outputs... That said, if we can really expect these graphs to be directed and acyclic, I'd be surprised to see the definition of the job as being at issue

  2. I'm afraid I'm not following the relation of requirements to this issue (sorry, brain still gearing up for the week)

  3. I'd guess that a lot of the same patterns of locality maximization are likely to come up across the board such that work can be saved by reading, cleaning, etc. before sorting/grouping by keys and then forking out to write processes. Either way, without this amendment I'm concerned that the beam API's DAG DSL is fundamentally broken and that we might not even know the extent to which that's true without diving deeper into exactly what state a Pipeline instance hangs onto that a chain of PTransforms will lose

@moradology
Copy link
Contributor Author

Here's a clear example of how important it can be to have multiple forking paths when defining beam pipelines: https://github.com/pangeo-forge/pangeo-forge-recipes/pull/689/files#diff-8bac120398898793cd4f9daf94551b1f3d3f1867bed8a68b14cceed49d6dc30fR503-R507

Now, I can't say with 100% certainty that this is the only way to get statistics which may be necessary for distributed work that requires global context but I can say that it is the only way I've found. If people have other ideas, I'd be very interested. If I'm right though, this feature is pretty much a must-have for us to enable complex recipes. In the above case, the issue doesn't come up because it happens in a slightly different context and the expand happens after a pipeline is available (it is lazy) but it illustrates the kind of workflow recipe writers may want/need to engage in

@cisaacstern
Copy link
Member

cisaacstern commented Feb 21, 2024

I've been thinking about this for a majority of the day today.

First of all, @moradology thank you for surfacing such deep insights about pipeline mutations and their relationship to branching DAGs in Beam. This is hard-won understanding for our community, which we absolutely should leverage to inform a revision of how we approach pipeline construction.

In terms of the callable proposal specifically: I'm wondering if we've gotten ourselves into the current predicament by trying to wrap Beam in some non-standard ways, and therefore maybe the best path forward is to try to get back to something that looks more like vanilla Beam, not less?

This PR does an elegant job of finding a way out of our current bind while changing very little here in runner, but this change has a rather large implication for what recipes themselves will need to look like: the object that recipe developers hand off to runner becomes (optionally, I suppose) a callable that mutates a pipeline. From the recipe look and feel standpoint, this seems to take us if anything even a bit further away than we currently are from vanilla Beam.

Which is long preamble to say: now that you've so clearly and helpfully identified this shortcoming of our current implementation, is this the right time to do something which has been discussed off and on since our beam work began; namely, should we make the change to have recipes be "just" a Beam script, with a requirement that they conclude with:

with beam.Pipeline() as p:
  p | ...

?

This brings us back into the mainstream of what's recommended by Beam itself, with all the attendant benefits of that, among them support for branching pipelines.

If we go this route, the scope of runner is narrowed to be just focused on injecting or otherwise providing the necessary PipelineOptions at script invocation time.

@moradology
Copy link
Contributor Author

moradology commented Feb 22, 2024

It's sounding as though the alternative proposal here would be to construct recipes as full pipelines? I think the with Pipeline as p: construct would likely need to be changed to assignment (recipe = beam.Pipeline(options) which doesn't seem like an antipattern as far as I can tell) and remains quite close to what one finds in some examples

What gives me some pause here is that PipelineOptions are added at the time of Pipeline construction. Perhaps there's a workaround for attaching such options later but I wonder if there's not something to be gained by the inversion of control that returning an actual function (and not just a Pipeline object) provides. I'm imagining a case in which there is, in addition to the pipeline configuration we'll need to sort out, PTransform changes baked into configuration that will perhaps depend on the specific environment in which code is run. Here's an example that recently made its way into the recipes which I could imagine people wanting to expose and defer decision making on rather than decide for all future runs: https://github.com/pangeo-forge/pangeo-forge-recipes/blob/main/pangeo_forge_recipes/transforms.py#L450

One area that's not clear, assuming the recipe-as-function approach, is exactly what we should expect as arguments. I can imagine at least a few alternatives Pipeline, Config, PipelineOptions, Config, even just Config with an expectation that recipes construct their options intelligently (I'd shy away from this given the constraints a selected Runner will put on how PipelineOptions are constructed.

I'm likely biased here, as I generally am a fan of passing around functions and being as lazy (in the principled, execution-semantics sense!) as possible and I know that some find it simpler to approach software with procedural, eager-execution in mind. This makes me a bit hesitant to overstate my case given how central it is as a design decision although to my mind recipe and function (with side effects) are very nearly synonymous

An example of how this deferral might enable more generality for recipes while keeping their specifics decoupled from the runner (a slight modification of this):

def recipe(pipeline: beam.Pipeline, config: SomeConfig):
    pipeline | beam.Create(config.pattern.items())
    | OpenWithKerchunk(
        remote_protocol=earthdata_protocol,
        file_type=config.pattern.file_type,
        inline_threshold=config.inline_threshold,
        storage_options=config.auth_args,
    )
    | WriteCombinedReference(
        concat_dims=config.CONCAT_DIMS,
        identical_dims=config.IDENTICAL_DIMS,
        store_name=config.SHORT_NAME
    ))
  1. the pipeline provided here can be expected to come pre-configured with execution-level details
  2. the runner need not have strong opinions about what kind of information is expected on config
  3. the runner could, however, use a type provided via the recipe file (and associated meta) to validate putative configurations at pipeline "compile time"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants