Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

TimeoutError: Deadline exceeded on activities taking greating than 120 seconds #15

Open
dkryptr opened this issue May 15, 2021 · 8 comments

Comments

@dkryptr
Copy link
Contributor

dkryptr commented May 15, 2021

Here's the stack trace.

2021-05-13 13:22:37,579 | ERROR | retry.py:retry_loop:29 | run failed: Deadline exceeded, retrying in 3 seconds
Traceback (most recent call last):
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 360, in recv_initial_metadata
    headers = await self._stream.recv_headers()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/protocol.py", line 349, in recv_headers
    await self.headers_received.wait()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/asyncio/locks.py", line 309, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/retry.py", line 17, in retry_loop
    await fp(*args, **kwargs)
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/decision_loop.py", line 1083, in run
    decision_task: PollWorkflowTaskQueueResponse = await self.poll()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/decision_loop.py", line 1165, in poll
    task = await self.service.poll_workflow_task_queue(
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/api/workflowservice/v1.py", line 828, in poll_workflow_task_queue
    return await self._unary_unary(
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/betterproto/__init__.py", line 1133, in _unary_unary
    response = await stream.recv_message()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 408, in recv_message
    await self.recv_initial_metadata()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 380, in recv_initial_metadata
    self.initial_metadata = im
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/utils.py", line 70, in __exit__
    raise self._error
asyncio.exceptions.TimeoutError: Deadline exceeded

Here's what I've uncovered:

The python-sdk starts up two parallel threads:

  • One to handle workflows (runs workflow logic and determines which activity needs to run)
  • One to handle activities (runs activity code)

Both threads use the same grpc channel to communicate with the temporal server with a timeout set to 120s. Concurrent RPC calls are supported according to the grpclib docs: https://grpclib.readthedocs.io/en/latest/client.html

The workflow thread polls the workflow task queue and the activity thread polls the activity task queue. Both take 60 seconds before continuing the while loop to poll again if nothing is returned. When the activity thread receives something on the activity task queue, it starts running the activity code. Meanwhile, the workflow thread is in the middle of polling the workflow task queue.

What I'm noticing is that the workflow poll request is "blocked" and doesn't return like it usually would after 60 seconds. The workflow poll request doesn't complete until the activity in the other thread is finished. If an activity takes long enough to complete, the workflow poll request can take more than 120 seconds (note the timeout mentioned earlier) causing a deadline exceeded error.

Solution:

  • Figure out why the workflow poll request is "stuck" and can't complete while an activity is running in the other thread.

A couple of temporary workaround:

  • Set the timeout to the max time that we expect activities could take. I don't know if there are consequences to this.

OR

  • Not worry about the deadline exceeded error because when it happens, it immediately continues the while loop and polls the workflow task queue again.
    • The downside to this is having this error clutter the worker logs
@firdaus
Copy link
Owner

firdaus commented May 18, 2021

Hi @CGreenburg, I'm not able to reproduce this with the code below. Could you share some sample code that produces that error on your side.

import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"


# Activities Interface
class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
    async def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError


# Activities Implementation
class GreetingActivitiesImpl:
    async def compose_greeting(self, greeting: str, name: str) -> str:
        await asyncio.sleep(200)
        return greeting + " " + name


# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError


# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

    async def get_greeting(self, name):
        return await self.greeting_activities.compose_greeting("Hello!", name)


async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
    worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")


if __name__ == '__main__':
    loop = asyncio.get_event_loop() 
    asyncio.ensure_future(worker_main())
    loop.run_forever()

@dkryptr
Copy link
Contributor Author

dkryptr commented May 18, 2021

We migrated a pyspark/pandas application to a workflow activity and it takes longer than 2 minutes. That's where we've seen the deadline exceeded exception. I've been replicating it using time.sleep(150).

@firdaus
Copy link
Owner

firdaus commented May 18, 2021

You might want to look into https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor if your activity is CPU bound or if the APIs that you're using aren't async friendly.

@firdaus
Copy link
Owner

firdaus commented May 18, 2021

There was a thread about this in the past where I proposed creating an event loop per worker (running in its own thread) so that the activity code is free to block as it pleases but they feedback I got previously was that the library should do the bare minimum and allow the user to decide how to handle blocking calls.

https://community.temporal.io/t/timeline-for-python-client-support/223/26

@firdaus
Copy link
Owner

firdaus commented May 18, 2021

If you're able to, I would run my workflow and activities in separate workers.

@firdaus
Copy link
Owner

firdaus commented May 18, 2021

You can also try something like this:

import asyncio
import logging
from datetime import timedelta
from threading import Thread

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
import time

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"


# Activities Interface
class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
    async def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError


# Activities Implementation
class GreetingActivitiesImpl:
    async def compose_greeting(self, greeting: str, name: str) -> str:
        time.sleep(150)
        return greeting + " " + name


# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError


# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

    async def get_greeting(self, name):
        return await self.greeting_activities.compose_greeting("Hello!", name)


async def worker_main(activities=False, workflows=False):
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    if activities:
        worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
    if workflows:
        worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")


def thread1():
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main(activities=True, workflows=False))
    loop.run_forever()

def thread2():
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main(activities=False, workflows=True))
    loop.run_forever()


if __name__ == '__main__':
    t1  = Thread(target=thread1)
    t2  = Thread(target=thread2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

@dkryptr
Copy link
Contributor Author

dkryptr commented May 24, 2021

Thanks for the suggestions @firdaus!

In your thread example, it's okay that the two workers use the same task queue? They don't need to be different task queues?

@firdaus
Copy link
Owner

firdaus commented May 24, 2021

It's fine because there are separate GRPC calls behind the scenes for polling for workflow tasks and activity tasks so a workflow worker will never get the work of an activity worker.

pypt added a commit to mediacloud/backend that referenced this issue Jan 2, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants