Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROPOSAL] Project Galois:Towards Mars 1.0 #1928

Open
qinxuye opened this issue Jan 25, 2021 · 0 comments
Open

[PROPOSAL] Project Galois:Towards Mars 1.0 #1928

qinxuye opened this issue Jan 25, 2021 · 0 comments
Labels

Comments

@qinxuye
Copy link
Collaborator

qinxuye commented Jan 25, 2021

Background

Mars has evolved a lot since the first release.

However, some challenges emerges.

Two Distinct Execution Layers

There exists two distinct execution layers, one for local scheduling, the other for distributed. To brief them, there are some main differences.

  • Local scheduling is thread-based, and distributed scheduling is process-based which relies on Mars actors.
  • Distributed scheduling has more functionalities hence is more complicated. Typical functionalities include:
    • Spilling mechanism and more strict memory control which prevents from out of memory when data is huge.
    • Partitioning algorithm which partitioning the initial nodes to different workers.
    • Data transfer service which allows to send data between workers.
    • Storage layer that allows to store data into shared memory as well as disk.
    • Fault tolerence mechanism which ensures that when some workers are gone, the graph can still perform computation with the rest workers.

Local scheduling is simple but lack of the functionalities which are crucial for stability and extensibility, however, there are some downsides for distributed scheduling.

  1. Distributed architecture is too complicated , for instance, on the scheduler side, we have GraphActor which controls lifecycle of a graph, as well as OperandActor which controls lifecycle of operand. Original idea is that each GraphActor can be focusd to current graph, so does operand. However, this generates quite a few problems. For example,
    • Meta redundance, the operand info needs to be stored both in GraphActor and OperanActor.
    • Actor message passing's time cannot be ignored, so for OperandActors, fetching info of predecessors or successors frequently costs heavily which may cause info inconsistency.
    • It's hard to implement fault tolerance when a graph relies on another one, because it's hard to track all the dependant graphs, and mark which operands need to be executed again due to data lost.
    • When graph is huge, creating plenty of OperandActor consumes too much time.
  2. There is no concept for job, but only graph. Oftenly, identical operands can show up in multiple graphs, for instance, users called (mt.ones((10, 10)) * 2).execute() and then called (mt.ones((10, 10)) + 1).execute(), TensorOnes with same key will show up twice, however, for identical operand key, we only create one OperandActor. The status of OperandActor may lead to chaos, especially the data of the operand is deleted or the operand is failovered.
  3. Not cancel-free, for now, if users want to cancel current computation, we cannot garantee that the cancel would be succeeded and the cancel would not effect the upcoming computations.
  4. Lack of API for many components, hence we cannot know quite a lot of information because we have no APIs to do this.

Lazy Evaluation and Eager Execution

Currently, most usage of Mars is lazy, that is, call .execute() each time when users want to get data, it's quite annoying, because it's not fit for the usual habit. Hence, Mars provides with an eager mode which triggers execution automatically once a Mars object e.g. tensor, DataFrame created.

However, lazy evaluation is more efficient but not convinient for usage, at same time, eager execution is convinient but not perticularly tuned for performance.

Number of Workers

Common question, how many workers required to start a cluster? This is the question every users would ask, it's a complicated question actually, because the number of workers is related to workload, and the number may change during whole computation, for instance, more complex computation may arrive which requires more workers, at the same time, with the ongoing of computation, some workers may be idle.

Key Goals to reach 1.0

Key goals to reach 1.0 can be concluded as.

  1. Stability. Be able to accomplish below goals without bringing unpredictable behavior.
    1. Cancel-free, job can be cancelled successfully at any phase during execution.
    2. Re-execute tileable object works well whenever it is or is not executed before.
    3. Supervisors and workers failover, ensure that job can be executed successfully when some worker is killed or OOM.
  2. Scaling cluster.
    1. Bringing online or offlining workers is garanteed to work well.
    2. Support restarting worker.
    3. Auto scale cluster according to workloads.
  3. Defer mode. Eliminate .execute() and make Mars ecosystem acts just like data science ecosystem on a single machine.

Proposals in summary

In order to accomplish project Galois and reach the goal to elevate the stability, I popose a few subprojects.

Oscar: Mars Actors 2.0

Mars Actors is the key component of entire distributed scheduling. Oscar is the 2.0 version of Mars Actors. It can be implemented separetly to co-exist with the 1.0 version(mars/actors) until it's mature.

Main improvements can be concluded as below.

  1. Support stateful as well as stateless actors, statful actors can only be allocated on main process while stateless actors can be allocated on main and sub processes.
  2. More sophisticated error handling.
  3. Deadlock detection.
  4. API tuning.
  5. Promise integration internally.
  6. Support Ray actors as backend.

For more details, please refer to proposal of Oscar(#1935).

Unified Execution Layer both for Local and Distributed

We need to unify the execution layers into one both suitable for local and distributed.

I suggest to remove thread-based scheduling entirely due the important functionalities that distributed scheduling provided include more intelligent and well-controled task scheduling as well as spilling mechanism.

Besides, here I suggest to rename scheduler nodes to supervisor nodes because we think supervisor nodes may be in charge of more responsibilities other than scheduling.

Session

For a cluster created, a default session will be ceated or reused unless users specify a new session. All information about the session including jobs and graphs will be implemented in a process so that they can share information without copying any data.

Jobs and Tasks

Like I mentioned, before, there is only graph abstraction, when user called .execute(), a coarse-grained graph which consists of DataFrame, tensor etc will be submitted to Mars schedulers. Then schedulers will tile coarse-grained graph into fine-grained graph which consists of chunks. A few actors are included in this process, first, GraphActor will recieve a graph, and do tile, intial assignment stuff, second, OperandActor will be created by GraphActor, one OperanActor is in charge of a lifecycle of an operand. However, GraphActors and OperandActors are allocated on different processes, even different schedulers, so they cannot share data with little effort. Thus, some info about operand may be copied both for GraphActor and operandActor, hence the data may be inconsistent if some of them changed. Besides, different GraphActors may create OperandActor which has same operand key, due to the reason that OperandActor with same key will be created once, the status of OperandActor will become chaos in this situation, consider that in the first graph the OperandActor has been told to release data, but the second graph is still using the data, the second graph may fail due to data lost.

Thus abstraction of job and task can be introduced, each job is consists of a graph. All jobs in a session is allocated on the same process, thus, they can share graphs without copying between each other in order that an operand's inputs can be tracked even they are in different jobs. Task stands for a subgraph that commit to a worker.

Services

A service is a basic component for a Mars cluster, service is an isolated unit that it may span across supervisor nodes and worker nodes, it must have APIs both for actors and web to retrieve informations. Service can only talk to another service via APIs, instead of direct Actor or REST calls.

Directory /mars/services/ could be the right place for source code.

In each service, a few directories are required for a better understanding.

|- services
   |- service_a
     |- supervisor   # actors and how they can be initialized on supervisor nodes
     |- worker   # actors and how they can be initialized on worker nodes
     |- config   # configuration that related to this service
     |- apis     # API definitions
       |- base
       |- web    # Web API
       |- mock   # Mock API for test purpose
       |- main   # Call specific API according to context

Services include:

  1. Meta service, responsiblities include (Consider supervisor node failover) (0.7):

    • For supervisor nodes,

      • Record data metas, a record can include id, type, dtypes, shape, extra_info, status, workers .
      • Record job and task status, a record can include id, type, status, start_time, end_time, is_recover.
      • Dump data into filesystem periodly so that the entire meta is recoverable.
      • Load dumped data when recovering from failure of supervisor node.
    • For worker nodes,

      • Record data metas, a record can include id, type, dtypes, shape, extra_info, status.
      • Record task status, a record can include id, status, start_time, end_time, is_recover.
  2. Lifecycle service (Consider supervisor node failover) (0.7).

    • Record reference count for each tileable and chunk data both for supervisor and worker nodes.
    • Delete chunk data if its reference count reaches 0.
    • Dump data into filesystem periodly to make lifecycle service recoverable.
    • Load dumped data when recovering from failure of supervisor node.
  3. ClusterManager service (0.7).

    • Monitor cluster situation.
    • Tell failover service to process failure if some worker crashes.
    • Support offlining worker, make sure that data has been migrated to somewhere else.
    • Watching new workers or stop watching workers if auto-scaling service scaled out or in.
    • Blocklist functionality that blocks some workers nodes from connection.
    • Record healty of the entire cluster, for instance, record "Is recovering failover from worker lost", "Everything works normal".
  4. Task service (Consider supervisor node failover) (0.7).

    • Accepting a coarse-grained graph a.k.a tileable graph from client, save it to recover filesystem, and perform tile to get a fine-grained a.k.a tiled graph.
    • Perform initial node partitioning as well as optimization like nodes fusion.
    • Generate a task graph according to tiled graph, each task owns an operand.
    • Submitting task graph to scheduling service.
    • Dump task graph periodly so that it can recover.
    • Accept updated tileable graph usally submitted by user defined functions.
    • Support pause, resume, cancel operations.
  5. JobAnalyzer service (0.7).

    • Accept a task graph, analyze and prioritize tasks to workers according to strategy.
    • Partitioning graph.
  6. Scheduling service (0.7).

    • Worker can implement hierarchical scheduling strategy (NUMA, GPU).
    • Support cancelling.
    • Work stealing can be implemented in this service. (0.8)
  7. Storage service (0.7).

    • Storage lib. (see [PROPOSAL] Mars storage lib #1905)
    • Storage service is worker-level only.
    • Data transfer and spill mechanism is integrated into storage service, other services shouldn't need to consider the existence of spill and data location.
    • Support migrating chunk data from one worker to another worker.
    • Tell meta service to modify meta when data transfer accomplished.
    • High-level API includes get, put, list, prepare.
    • Low-level API includes, create_buffer, seal.
    • Support dumping all data into some filesystem.
  8. Shuffle service (0.8).

    • Manage shuffle execution.
    • Handle data location (shuffle meta).
    • Shuffle data transfer.
    • Combine data when shuffled data is generated.
    • May just deliver data directly to downstream according to situation.
  9. Metrics service (0.7).

    • Record metrics for worker nodes, like running time of operand, transfering speed etc.
    • Metrics on supervisor node receive aggregated metrics from workers and give a brief aggregated result.
    • Promethus integration.
  10. Failover service (0.8)

  • If failover service is working, all new jobs will be hold until supervisor and workers nodes have been garanteed to have recovered.

  • For failover of supervisor nodes, when supervisor node restarted, failover service will be told that this supervisor node is recovered, a couple of things will be done.

    1. Meta, lifecycle as well as job services will try to failover the previous informations from recover filesystem. Then failover service will check if all jobs have been loaded, if not, process the jobs that didn't dump data in time.
    2. Check finished and running jobs, check chunk data status with workers, if some lost, reschedule tasks that can regenerate lost data.
  • For lost of worker nodes

    1. Clean out all the tasks scheduled in scheduling service.
    2. Check meta service to find lost chunk data, scan finished and running jobs to reschedule those tasks can regenerate lost data.
  1. Auto-scaling service (0.8)

    • If cluster is configured as auto-scaling, this service will decide to scale out or scale in workers count.
    • Scale-in is mostly due to idle of workers, at same time, scale-out is mostly due to requirements of new job or the load of current workers that is rather high.
    • If scale-out is decided, tell cluster to scale to the size, when finished, tell clustermanager service these workers are ready, clear out all scheduling tasks, all running jobs will repartitioning initial nodes and reschedule tasks again.
    • If scale-in is decided, decide those workers to shut down first, tell those workers to transfer data to rest workers or some external storage. When data migration finished, shut down those workers, tell clustermanager service to unregister those workers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant