You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Covalent now supports dispatch cancellation. Executor plugins can opt in to this functionality by registering a job identifier for each task (in this case the job id or ARN) and implementing a cancel method. This method will be invoked by Covalent when the user requests that the task be cancelled.
Methods provided by Covalent
All executor plugins automatically implement the following methods to be invoked in the plugin's run() method.
asyncdefset_job_handle(self, job_handle)
saves a job identifier (job_handle) in Covalent's database. The job_handle can be any JSONable type, typically a string or integer, and should contain whatever information is needed to cancel the job later. In this case, the ECS stop_task method expects the task id assigned by ECS.
asyncdefget_cancel_requested(self) ->bool
queries Covalent if task cancellation has been requested. This can be called at various junctures in the run() method if desired.
The run() method should raise a TaskCancelledError exception upon ascertaining that the backend job has been cancelled.
Methods to be implemented by each plugin:
Each plugin defines the following abstract method to be overridden with backend-specific logic:
Upon receiving a cancellation request, the Covalent server will invoke this with the following inputs:
task_metadata: currently contains the keys dispatch_id and node_id as for the run() method.
job_handle: the task's job identifier previously saved using set_job_handle().
In addition to querying Covalent for cancellation requests, the run() method may need to query the compute backend to determine whether the job is in fact cancelled and raise TaskCancelledError if that is the case.
The code below provides a rough guide to using the above methods
fromcovalent._shared_files.exceptionsimportTaskCancelledError
...
asyncdefproceed_if_task_not_cancelled(self):
ifawaitself.get_cancel_requested():
self._debug_log(f"Task Cancelled")
raiseTaskCancelledError(f"Batch job {batch_job_name} cancelled")
asyncdefrun(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) ->Any:
...
awaitself.proceed_if_task_not_cancelled()
# upload pickled assets
...
awaitself.proceed_if_task_not_cancelled()
# invoke job/taskawaitself.set_job_handle(handle=job_handle)
...
# await self.poll_job_status(task_metadata, job_handle)asyncdefpoll_job_status(self, task_metadata, job_id):
# Boto3 client invocations to check the job status# while timeout_not_exceeded:# job_state = client.describe_job(job_id) # if job_state == "CANCELLED":# raise TaskCancelledError# ...# await asyncio.sleep(poll_freq)asyncdefcancel(self, task_metadata: Dict, job_handle: str) ->None:
""" Cancel the batch job Arg(s) task_metadata: Dictionary with the task's dispatch_id and node id job_handle: Unique job handle assigned to the task by Batch Return(s) True/False indicating if the job was cancelled """# boto client invocations to cancel the task
...
ifjob_cancelled:
returnTrueelse:
# Job cannot be cancelled for one reason or anotherreturnFalse
Acceptance Criteria
In the run() method:
Save the job handle for the task once that is determined.
Check whether the job has been cancelled at various junctures
Implement cancel method
Ensure that the a workflow is tested with cancellation to ensure cancel functionality correctly integrated
The text was updated successfully, but these errors were encountered:
Description
Covalent now supports dispatch cancellation. Executor plugins can opt in to this functionality by registering a job identifier for each task (in this case the job id or ARN) and implementing a
cancel
method. This method will be invoked by Covalent when the user requests that the task be cancelled.Methods provided by Covalent
All executor plugins automatically implement the following methods to be invoked in the plugin's
run()
method.saves a job identifier (
job_handle
) in Covalent's database. Thejob_handle
can be any JSONable type, typically a string or integer, and should contain whatever information is needed to cancel the job later. In this case, the ECSstop_task
method expects the task id assigned by ECS.queries Covalent if task cancellation has been requested. This can be called at various junctures in the
run()
method if desired.The
run()
method should raise aTaskCancelledError
exception upon ascertaining that the backend job has been cancelled.Methods to be implemented by each plugin:
Each plugin defines the following abstract method to be overridden with backend-specific logic:
Upon receiving a cancellation request, the Covalent server will invoke this with the following inputs:
task_metadata
: currently contains the keysdispatch_id
andnode_id
as for therun()
method.job_handle
: the task's job identifier previously saved usingset_job_handle()
.In addition to querying Covalent for cancellation requests, the
run()
method may need to query the compute backend to determine whether the job is in fact cancelled and raiseTaskCancelledError
if that is the case.The code below provides a rough guide to using the above methods
Acceptance Criteria
run()
method:job handle
for the task once that is determined.cancel
methodcancel
functionality correctly integratedThe text was updated successfully, but these errors were encountered: