Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask Integration for Analysis Grand Challenge #712

Closed
goseind opened this issue May 30, 2023 · 3 comments
Closed

Dask Integration for Analysis Grand Challenge #712

goseind opened this issue May 30, 2023 · 3 comments

Comments

@goseind
Copy link
Member

goseind commented May 30, 2023

Motivation

As we talked about in our Analysis Grand Challenge meeting today, we would like to enhance REANA to support Dask for distributed analysis on Kubernetes as a backend.

Modifications

This would involve allowing the user to specify to easily enable the usage of a Dask cluster in his workflow file e. g.:

Dask: true
# some more options tbd

In the background, this would create a Dask cluster from the user pod. Further, this would require additional helm values in the helm Chart to configure the Dask cluster backend in the first place e. g. Kubernetes Cluster.

Refer also to this CodiMD for more details on today's meeting.

@egazzarr
Copy link
Member

to be tested on the AGC Dask example: https://github.com/root-project/analysis-grand-challenge

@tiborsimko
Copy link
Member

I have done a proof-of-concept test on my local REANA deployment. Everything works well.

  1. Bring up a local Dask service inside the REANA cluster:
$ helm install dask dask/dask --version 2023.1.0
  1. Detect the Dask scheduler connection URI:
$ DASK_SCHEDULER_URI=$(kubectl get services dask-scheduler | awk 'NR==2 {print "tcp://" $3 ":" substr($5,0,4)}')
$ echo $DASK_SCHEDULER_URI
tcp://10.96.4.51:8786
  1. Create a simple Dask-based user analysis example for the proof-of-concept:
import dask
import dask.array
import dask.distributed
import os

DASK_SCHEDULER_URI = os.getenv("DASK_SCHEDULER_URI", "tcp://127.0.0.1:8080")
client = dask.distributed.Client(DASK_SCHEDULER_URI)

x = dask.array.random.random((10000, 10000), chunks=(1000, 1000))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)

result = z.compute()
print(result)
  1. Create accompanying reana.yaml workflow structure using the above Dask scheduler connection information:
inputs:
  files:
    - dask_demo.py
workflow:
  type: serial
  specification:
    steps:
      - name: mystep
        environment: 'ghcr.io/dask/dask:2023.1.0'
        commands:
        - DASK_SCHEDULER_URI=tcp://10.96.4.51:8786 python dask_demo.py > dask_demo.res
outputs:
  files:
    - dask_demo.res
  1. Run this example on REANA:
$ reana-client run -w dask
$ sleep 60
$ reana-client status -w dask
$ reana-client ls -w dask
$ reana-client download -w dask dask_demo.res
$ tail dask_demo.res
[0.99483301 0.99318256 1.0114246  ... 0.99163413 0.99748661 1.01648798]

Note

Beware of versioning. In the above example, we are using Dask version 2023.1.0 in both the cluster and the client. If a user happens to use say the latest Dask 2023.5.1 in their job image, then the above workflow would fail due to client vs cluster version inconsistencies:

$ kubectl logs service/dask-scheduler | tail
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5213, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream
    handler(**merge(extra, msg))
TypeError: update_graph() got an unexpected keyword argument 'graph_header'
2023-05-31 12:46:26,523 - distributed.scheduler - INFO - Receive client connection: Client-279c10b7-ffb1-11ed-800d-4285cf89ac3c
2023-05-31 12:46:26,523 - distributed.core - INFO - Starting established connection to tcp://10.244.0.43:60864
2023-05-31 12:46:26,624 - distributed.core - INFO - Connection to tcp://10.244.0.43:60864 has been closed.

This means that:

  • If a REANA site would use a statically pre-created Dask cluster for user jobs, then reana-client validate will have to carefully check user images for any Dask-related version consistencies before accepting user jobs.

  • Ideally though, the user should be able to specify in their reana.yaml the desired Dask resource version, for example:

    resources:
      dask:
        version: 2023.1.0

    REANA would then dynamically create a new individual Dask cluster with the desired version for this concrete user at the analysis start-up time, run the analysis, and bring the Dask cluster back down afterwards. (Which will all need careful error checking when managing desired resources by REANA.)

@tiborsimko tiborsimko self-assigned this May 31, 2023
@tiborsimko tiborsimko added this to Dask Aug 6, 2024
@tiborsimko tiborsimko moved this to Backlog in Dask Aug 6, 2024
@tiborsimko
Copy link
Member

The proof-of-concept has been done, the RFC with the desired features for the Dask integration is being prepared in #823, and we are starting the regular work on implementing the features. Hence I'm closing this proof-of-concept issue.

@github-project-automation github-project-automation bot moved this from Backlog to Done in Dask Aug 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

No branches or pull requests

3 participants