You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Mars now handles all subtasks within a single supervisor. When the number of subtasks or workers is large, there can be huge load on a supervisor node. What's more, scheduling merely on the supervisor side brings considerable latency between worker tasks. If we move subtask scheduling to workers, these issues can be alleviated.
Overall design
This design enables workers to schedule subtasks submitted to it, while supervisors only act as batch assigners and coordinators. Subtasks created from TaskService will be assigned and pushed into workers. Then inside workers, subtasks are then queued and executed given priority assigned to them. Results are then fed back to supervisors for further activation of successors.
Subtask submission
When subtasks are generated, the assigned supervisor assignes and pushes all ready subtasks to corresponding workers. Unlike previous design, the supervisor no longer decides how many subtasks it need to submit to workers given global slot information, neither did it maintain queues of subtasks. Workers decide and run subtasks given their own storage, leading to faster reaction speed and narrower gap between execution.
Subtask queueing
Inside workers, we use queues with latches to order and control tasks. The queue can be seen as a combination of a priority queue deciding orders of subtasks with a semaphore deciding the number of subtasks to output. The default value of the semaphore is equal to the number of slots of given bands. The basic API of the queue is shown below:
classSubtaskPriorityQueueActor(mo.StatelessActor):
@mo.extensibledefput(self, subtask_id: str, band_name: str, priority: Tuple):
""" Put a subtask ID into the queue. """@mo.extensibledefupdate_priority(self, subtask_id: str, band_name: str, priority: Tuple):
""" Update priority of given subtask. """asyncdefget(self, band_name: str):
""" Get an item from the queue and returns the subtask ID and slot ID. Will wait when the queue is empty, or the value of semaphore is zero. """defrelease_slot(self, subtask_id: str, errors: str="raise"):
""" Return the slot occupied by given subtask and increase the value of the semaphore. """@mo.extensibledefremove(self, subtask_id: str):
""" Remove a subtask from the queue. If the subtask is occupying some slot, the slot is also released. """
More APIs can be added to implement operations like yield_slot.
To parallelize IO and CPU cost, two queues are set up inside the worker.
PrepareQueue: queue of submitted subtasks. A prepare task consumes items of the queue and do quora allocation as well as data moving. When a new subtask starts execution, its slot is released.
ExecutionQueue: queue of prepared subtasks. An execution task consumes items of the queue and do execution. When a subtask finishes execution, its slots are then released.
Successor forwarding
When a subtask finishes execution and we need to choose another subtask to run, we have two kinds of subtasks to schedule: subtasks already enqueued in ExecutionQueue, and subtasks whose predecessors are just filled by the execution finished just now. The latter group often have higher priority but without data preparation, and may not be scheduled because of latencies brought by queues. We design a successor forwarding mechanism to resolve this condition.
When pushing ready subtasks to scheduling service, its successors are also pushed for cache. Scheduling service decides and pushes subtasks to correct workers. Subtasks whose successors can be forwarded must satisfy conditions below:
Some of the successors are cached in workers
All dependent data of successors are already stored in workers, thus we do not need to consult Meta or Storage service for remote data retrival
There is enough quota for the successor
When all conditions are met, PrepareQueueis skipped and the subtask is inserted into ExecutionQueuedirectly. When the slot is released, the successor will be scheduled as soon as possible.
Possible impacts
Autoscale
Current autoscale is based on queueing mechanism at supervisor side, which must be redesigned based on worker scheduling.
Fault Injection
As worker side of scheduling service is rewritten, fault injection need to adapt to that change.
The text was updated successfully, but these errors were encountered:
wjsi
linked a pull request
Jan 13, 2022
that will
close
this issue
Motivations
Mars now handles all subtasks within a single supervisor. When the number of subtasks or workers is large, there can be huge load on a supervisor node. What's more, scheduling merely on the supervisor side brings considerable latency between worker tasks. If we move subtask scheduling to workers, these issues can be alleviated.
Overall design
This design enables workers to schedule subtasks submitted to it, while supervisors only act as batch assigners and coordinators. Subtasks created from TaskService will be assigned and pushed into workers. Then inside workers, subtasks are then queued and executed given priority assigned to them. Results are then fed back to supervisors for further activation of successors.
Subtask submission
When subtasks are generated, the assigned supervisor assignes and pushes all ready subtasks to corresponding workers. Unlike previous design, the supervisor no longer decides how many subtasks it need to submit to workers given global slot information, neither did it maintain queues of subtasks. Workers decide and run subtasks given their own storage, leading to faster reaction speed and narrower gap between execution.
Subtask queueing
Inside workers, we use queues with latches to order and control tasks. The queue can be seen as a combination of a priority queue deciding orders of subtasks with a semaphore deciding the number of subtasks to output. The default value of the semaphore is equal to the number of slots of given bands. The basic API of the queue is shown below:
More APIs can be added to implement operations like
yield_slot
.To parallelize IO and CPU cost, two queues are set up inside the worker.
PrepareQueue
: queue of submitted subtasks. A prepare task consumes items of the queue and do quora allocation as well as data moving. When a new subtask starts execution, its slot is released.ExecutionQueue
: queue of prepared subtasks. An execution task consumes items of the queue and do execution. When a subtask finishes execution, its slots are then released.Successor forwarding
When a subtask finishes execution and we need to choose another subtask to run, we have two kinds of subtasks to schedule: subtasks already enqueued in ExecutionQueue, and subtasks whose predecessors are just filled by the execution finished just now. The latter group often have higher priority but without data preparation, and may not be scheduled because of latencies brought by queues. We design a successor forwarding mechanism to resolve this condition.
When pushing ready subtasks to scheduling service, its successors are also pushed for cache. Scheduling service decides and pushes subtasks to correct workers. Subtasks whose successors can be forwarded must satisfy conditions below:
Some of the successors are cached in workers
All dependent data of successors are already stored in workers, thus we do not need to consult Meta or Storage service for remote data retrival
There is enough quota for the successor
When all conditions are met,
PrepareQueue
is skipped and the subtask is inserted intoExecutionQueue
directly. When the slot is released, the successor will be scheduled as soon as possible.Possible impacts
Autoscale
Current autoscale is based on queueing mechanism at supervisor side, which must be redesigned based on worker scheduling.
Fault Injection
As worker side of scheduling service is rewritten, fault injection need to adapt to that change.
The text was updated successfully, but these errors were encountered: