Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]Test on ray cluster: TypeError: __init__() missing 1 required positional argument: 'pid' #3197

Open
guodashun opened this issue Jul 28, 2022 · 5 comments

Comments

@guodashun
Copy link

Describe the bug
When I run the example code in mars documentation, I got the error msg
my code like this

import ray
import time

import mars
import mars.tensor as mt
import mars.dataframe as md

def ray_init():
    while True:
        try:
            print(ray.init(address='xx.xx.xx.xx:9999',
                           log_to_driver=True,
                           ignore_reinit_error=True,
                           ))
            break
        except ConnectionError:
            print("Ray head is not ready yet, retry")
            time.sleep(RAY_WORKER_DELAY)

ray_init()
cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)
mt.random.RandomState(0).rand(1000_000000, 5).sum().execute()
df = md.DataFrame(
    mt.random.rand(1000_0000, 4, chunk_size=500_0000),
    columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)

To Reproduce
To help us reproducing this bug, please provide information below:

  1. Your Python version: 3.8.13
  2. The version of Mars you use: 0.9.0
  3. Versions of crucial packages, such as numpy, scipy and pandas
    i. numpy: 1.23.0
    ii. ray: 1.13.0
    iii. scipy: 1.8.1
    iv. pandas: 1.4.2
  4. Full stack of the error.
(RaySubPool pid=27115) 2022-07-28 10:58:23,066  ERROR serialization.py:342 -- __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115)     obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115)     value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115)     return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115)     obj = pickle.loads(in_band)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115)     inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) Get object ObjectRef(a9e08a2346149af818688fe9862d1fe4520f05550900000001000000) from ray://ray-cluster-1658977074/2/0 failed, got exception System error: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) traceback: Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115)     obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115)     value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115)     return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115)     obj = pickle.loads(in_band)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115)     inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) .
(RaySubPool pid=27115) Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 285, in handle_task
(RaySubPool pid=27115)     result = await object_ref
(RaySubPool pid=27115) ray.exceptions.RaySystemError: System error: __init__() missing 1 required positional argument: 'pid'
(RaySubPool pid=27115) traceback: Traceback (most recent call last):
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
(RaySubPool pid=27115)     obj = self._deserialize_object(data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/oscar/backends/ray/communication.py", line 169, in _deserialize_object
(RaySubPool pid=27115)     value = _ray_deserialize_object(self, data, metadata, object_ref)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
(RaySubPool pid=27115)     return self._deserialize_msgpack_data(data, metadata_fields)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
(RaySubPool pid=27115)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
(RaySubPool pid=27115)     obj = pickle.loads(in_band)
(RaySubPool pid=27115)   File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/lib/tblib/pickling_support.py", line 26, in unpickle_exception
(RaySubPool pid=27115)     inst = func(*args)
(RaySubPool pid=27115) TypeError: __init__() missing 1 required positional argument: 'pid'

  1. Minimized code to reproduce the error.

Expected behavior
A clear and concise description of what you expected to happen.

Additional context
Find #2860 has the similar error msg

@fyrestone
Copy link
Contributor

@chaokunyang @wjsi It seems that the tblib has compatibility issue with RayError?

@fyrestone
Copy link
Contributor

Could you try this?

import ray
import time

import mars
import mars.tensor as mt
import mars.dataframe as md

def ray_init():
    while True:
        try:
            print(ray.init(address='xx.xx.xx.xx:9999',
                           log_to_driver=True,
                           ignore_reinit_error=True,
                           ))
            break
        except ConnectionError:
            print("Ray head is not ready yet, retry")
            time.sleep(RAY_WORKER_DELAY)

ray_init()
# cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)
mars.new_session(backend="ray")
mt.random.RandomState(0).rand(1000_000000, 5).sum().execute()
df = md.DataFrame(
    mt.random.rand(1000_0000, 4, chunk_size=500_0000),
    columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)

@guodashun
Copy link
Author

@fyrestone Thanks for reply.
I've tried the new code and the 'pid' error seems to be fixed.
But another error raises at print(df.describe().execute())

Traceback (most recent call last):
  File "test_in_cluster.py", line 86, in <module>
    print(df.describe().execute())
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/tileables.py", line 462, in execute
    result = self.data.execute(session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/executable.py", line 144, in execute
    return execute(self, session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1890, in execute
    return session.execute(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1684, in execute
    execution_info: ExecutionInfo = fut.result(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1870, in _execute
    await execution_info
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 105, in wait
    return await self._aio_task
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 953, in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 365, in run
    await self._process_stage_chunk_graph(*stage_args)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 243, in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 366, in execute_subtask_graph
    output_keys = self._get_subtask_output_keys(subtask_chunk_graph)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 550, in _get_subtask_output_keys
    raise NotImplementedError(
NotImplementedError: The shuffle operands are not supported by the ray executor.

What confuses me is the initial code I provide before can work on this line with the output

                  a             b             c             d
count  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07
mean   5.000693e-01  4.999776e-01  5.000117e-01  5.001075e-01
std    2.887084e-01  2.886315e-01  2.886625e-01  2.887464e-01
min    5.400421e-08  3.773959e-08  2.246664e-08  3.552102e-08
25%    2.501076e-01  2.499848e-01  2.500975e-01  2.500542e-01
50%    5.001653e-01  5.001005e-01  5.000041e-01  5.001642e-01
75%    7.502280e-01  7.499122e-01  7.499833e-01  7.501780e-01
max    9.999998e-01  1.000000e+00  9.999999e-01  1.000000e+00

@fyrestone
Copy link
Contributor

@fyrestone Thanks for reply. I've tried the new code and the 'pid' error seems to be fixed. But another error raises at print(df.describe().execute())

Traceback (most recent call last):
  File "test_in_cluster.py", line 86, in <module>
    print(df.describe().execute())
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/tileables.py", line 462, in execute
    result = self.data.execute(session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/core/entity/executable.py", line 144, in execute
    return execute(self, session=session, **kw)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1890, in execute
    return session.execute(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1684, in execute
    execution_info: ExecutionInfo = fut.result(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 1870, in _execute
    await execution_info
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 105, in wait
    return await self._aio_task
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/deploy/oscar/session.py", line 953, in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 365, in run
    await self._process_stage_chunk_graph(*stage_args)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/supervisor/processor.py", line 243, in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 366, in execute_subtask_graph
    output_keys = self._get_subtask_output_keys(subtask_chunk_graph)
  File "/home/my_project/anaconda3/envs/my_project/lib/python3.8/site-packages/mars/services/task/execution/ray/executor.py", line 550, in _get_subtask_output_keys
    raise NotImplementedError(
NotImplementedError: The shuffle operands are not supported by the ray executor.

What confuses me is the initial code I provide before can work on this line with the output

                  a             b             c             d
count  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07
mean   5.000693e-01  4.999776e-01  5.000117e-01  5.001075e-01
std    2.887084e-01  2.886315e-01  2.886625e-01  2.887464e-01
min    5.400421e-08  3.773959e-08  2.246664e-08  3.552102e-08
25%    2.501076e-01  2.499848e-01  2.500975e-01  2.500542e-01
50%    5.001653e-01  5.001005e-01  5.000041e-01  5.001642e-01
75%    7.502280e-01  7.499122e-01  7.499833e-01  7.501780e-01
max    9.999998e-01  1.000000e+00  9.999999e-01  1.000000e+00

The latest master will be OK. The feature is not contained in previous releases, so the document has not been updated.

@guodashun
Copy link
Author

@fyrestone cool! The laster master works well!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants